시황 분석 LSTM 모델 훈련
이전 Kiwoom 자동 매매 프로그램 구현 포스팅들 중 시황분석 페이지에서 투자 점수 부분을 구현하지 않고 남겨두었다.
해당 부분을 LSTM 모델로 간단하게 오늘 투자 위험 여부를 판단하는 모델을 훈련시켜 적용하도록 구현한다.
데이터 전처리
데이터셋은 Investing.com 사이트에서 과거 5년간의 시황 데이터(환율, 미국 국채 10년, 서부 텍사스유 등등)를 이용하였다.
import os
import pandas as pd
import glob
def preprocessing(dataset, filename) :
# 날짜 기준 오름차순 정렬
dataset["날짜"] = pd.to_datetime(dataset["날짜"])
dataset = dataset.sort_values(by="날짜", ascending=True).reset_index(drop=True)
if "거래량" in dataset.columns :
dataset = dataset.drop(["거래량"], axis=1)
# 각 지표 이름 붙이기
prefix = os.path.splitext(os.path.basename(filename))[0]
dataset.columns = [f"{prefix}_{col}" if col != "날짜" else col for col in dataset.columns]
return dataset
import numpy as np
# % 변환 함수
def convert_percent(x) :
if isinstance(x, str) and "%" in x :
return float(x.replace("%", "")) / 100.0
return x
def merge_preprocessing(dataset_list) :
if len(dataset_list) >= 2 :
standard_data = dataset_list[0]
else :
standard_data = dataset_list
# 같은 날짜 데이터 병합
if len(dataset_list) >= 2 :
for df in dataset_list[1:] :
date_column = [col for col in df.columns if '날짜' in col]
standard_data = pd.merge(standard_data, df, on=date_column, how="outer")
# 주식장이 쉬는 날은 변동을 NaN 에서 0.00%로 변경
mask = standard_data.columns.str.endswith("_변동 %")
standard_data.loc[:, mask] = standard_data.loc[:, mask].apply(lambda col: col.fillna("0.00%"))
# 나머지 NaN값들은 이전 날의 데이터 쓰기(주말 or 공휴일일 가능성이 높으므로)
standard_data.fillna(method="ffill", inplace=True)
# 데이터 날짜 기준 정렬
standard_data = standard_data.sort_values(by="날짜", ascending=True).reset_index(drop=True)
# %로 표현된 값들 수치형 데이터로 표현
percent_columns = [col for col in standard_data.columns if col.endswith("_변동 %")]
standard_data[percent_columns] = standard_data[percent_columns].applymap(convert_percent)
# ","가 포함된 object 타입 수치형 데이터로 변환
standard_data = standard_data.applymap(lambda x: float(x.replace(",", "")) if isinstance(x, str) else x)
# 투자 여부 레이블 추가(국장이므로 코스닥과 코스피 변동률을 기준으로 4분할하여 지정)
if len(standard_data) >= 2 :
standard_point = standard_data["코스닥_변동 %"] + standard_data["코스피_변동 %"]
quartiles = pd.qcut(standard_point, 4, labels=False)
standard_data["투자 여부"] = quartiles
return standard_data
import re
csv_files = glob.glob(os.path.join("*.csv"))
tr_csv_files = [file for file in csv_files if re.search("[가-힣]", file)]
df_list = []
for file in tr_csv_files :
df = pd.read_csv(file)
pro_df = preprocessing(df, file)
df_list.append(pro_df)
data = merge_preprocessing(df_list)
/tmp/ipykernel_61336/2289762405.py:26: FutureWarning: DataFrame.fillna with 'method' is deprecated and will raise in a future version. Use obj.ffill() or obj.bfill() instead.
standard_data.fillna(method="ffill", inplace=True)
/tmp/ipykernel_61336/2289762405.py:33: FutureWarning: DataFrame.applymap has been deprecated. Use DataFrame.map instead.
standard_data[percent_columns] = standard_data[percent_columns].applymap(convert_percent)
/tmp/ipykernel_61336/2289762405.py:36: FutureWarning: DataFrame.applymap has been deprecated. Use DataFrame.map instead.
standard_data = standard_data.applymap(lambda x: float(x.replace(",", "")) if isinstance(x, str) else x)
모델 훈련 및 저장
from sklearn.preprocessing import StandardScaler
import joblib
def train_scaler(dataset, save_path="scaler.pkl") :
if "투자 여부" in dataset.columns:
numeric_data = dataset.drop(columns=["투자 여부"]).select_dtypes(include=[np.number])
else:
numeric_data = dataset.select_dtypes(include=[np.number])
scaler = StandardScaler()
scaler.fit(numeric_data)
joblib.dump(scaler, save_path)
return dataset
def scale_data(dataset, scaler_path="scaler.pkl"):
scaler = joblib.load(scaler_path)
numeric_data = dataset.select_dtypes(include=[np.number])
scaled_data = scaler.transform(numeric_data)
dataset[numeric_data.columns] = scaled_data
return dataset
data = train_scaler(data)
import numpy as np
data.set_index("날짜", inplace=True)
def create_sequence(df, seq_length, target_column="투자 여부") :
sequence = []
labels = []
for i in range(len(df) - seq_length) :
seq = df.iloc[i:i+seq_length].drop(columns=[target_column]).values
label = df.iloc[i+seq_length][target_column]
sequence.append(seq)
labels.append(label)
return np.array(sequence), np.array(labels)
seq_length = 14
X, y = create_sequence(data, seq_length)
import torch
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)
import torch.nn as nn
import torch.optim as optim
class LSTMAutoencoder(nn.Module) :
def __init__(self, input_dim, hidden_dim, latent_dim) :
super(LSTMAutoencoder, self).__init__()
self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.latent = nn.Linear(hidden_dim, latent_dim)
self.decoder = nn.LSTM(latent_dim, hidden_dim, batch_first=True)
self.output_layer = nn.Linear(hidden_dim, input_dim)
def forward(self, x) :
_, (h_n, _) = self.encoder(x)
z = self.latent(h_n[-1])
z = z.unsqueeze(1).repeat(1, x.size(1), 1)
decoded, _ = self.decoder(x)
reconstructed = self.output_layer(decoded)
return reconstructed, z
# 미래 값 예측 모델
class FuturePredictor(nn.Module) :
def __init__(self, latent_dim, prediction_dim) :
super(FuturePredictor, self).__init__()
self.lstm = nn.LSTM(latent_dim, 64, batch_first=True)
self.fc = nn.Linear(64, prediction_dim)
def forward(self, z) :
lstm_out, _ = self.lstm(z)
future_values = self.fc(lstm_out[:, -1, :])
return future_values
input_dim = X.shape[-1]
hidden_dim = 64
latent_dim = 35
num_classes = 4
learning_rate = 1e-3
autoencoder = LSTMAutoencoder(input_dim, hidden_dim, latent_dim)
predictor = FuturePredictor(latent_dim, num_classes)
reconstruction_criterion = nn.MSELoss()
classification_criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(list(autoencoder.parameters()) + list(predictor.parameters()), lr=learning_rate)
num_epochs = 1000
for epoch in range(num_epochs) :
optimizer.zero_grad()
reconstructed, latent = autoencoder(X)
future_values = predictor(latent)
reconstruction_loss = reconstruction_criterion(reconstructed, X)
prediction_loss = classification_criterion(future_values, y)
loss = reconstruction_loss + prediction_loss
loss = reconstruction_loss + prediction_loss
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0 :
print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
with torch.no_grad() :
_, latent = autoencoder(X)
predicted_future_values = predictor(latent)
predicted_classes = torch.argmax(predicted_future_values, dim=1)
Epoch [10/1000], Loss: 1.2755 Epoch [20/1000], Loss: 1.1931 Epoch [30/1000], Loss: 1.1400 Epoch [40/1000], Loss: 1.0765 Epoch [50/1000], Loss: 1.0346 Epoch [60/1000], Loss: 0.9718 Epoch [70/1000], Loss: 0.9040 Epoch [80/1000], Loss: 0.8933 Epoch [90/1000], Loss: 0.8503 Epoch [100/1000], Loss: 0.7929 Epoch [110/1000], Loss: 0.7393 Epoch [120/1000], Loss: 0.6981 Epoch [130/1000], Loss: 0.6678 Epoch [140/1000], Loss: 0.6317 Epoch [150/1000], Loss: 0.6602 Epoch [160/1000], Loss: 0.6030 Epoch [170/1000], Loss: 0.5782 Epoch [180/1000], Loss: 0.5650 Epoch [190/1000], Loss: 0.5420 Epoch [200/1000], Loss: 0.4929 Epoch [210/1000], Loss: 0.5673 Epoch [220/1000], Loss: 0.5614 Epoch [230/1000], Loss: 0.5323 Epoch [240/1000], Loss: 0.4475 Epoch [250/1000], Loss: 0.4171 Epoch [260/1000], Loss: 0.3954 Epoch [270/1000], Loss: 0.3862 Epoch [280/1000], Loss: 0.3540 Epoch [290/1000], Loss: 0.3371 Epoch [300/1000], Loss: 0.7158 Epoch [310/1000], Loss: 0.4441 Epoch [320/1000], Loss: 0.3782 Epoch [330/1000], Loss: 0.3601 Epoch [340/1000], Loss: 0.3244 Epoch [350/1000], Loss: 0.2993 Epoch [360/1000], Loss: 0.2804 Epoch [370/1000], Loss: 0.2644 Epoch [380/1000], Loss: 0.2520 Epoch [390/1000], Loss: 0.3947 Epoch [400/1000], Loss: 0.4152 Epoch [410/1000], Loss: 0.3631 Epoch [420/1000], Loss: 0.3088 Epoch [430/1000], Loss: 0.2575 Epoch [440/1000], Loss: 0.2317 Epoch [450/1000], Loss: 0.2161 Epoch [460/1000], Loss: 0.2012 Epoch [470/1000], Loss: 0.1907 Epoch [480/1000], Loss: 0.1820 Epoch [490/1000], Loss: 0.1746 Epoch [500/1000], Loss: 0.1682 Epoch [510/1000], Loss: 0.1624 Epoch [520/1000], Loss: 0.1571 Epoch [530/1000], Loss: 0.1521 Epoch [540/1000], Loss: 0.1475 Epoch [550/1000], Loss: 0.1432 Epoch [560/1000], Loss: 0.1391 Epoch [570/1000], Loss: 0.1352 Epoch [580/1000], Loss: 0.1318 Epoch [590/1000], Loss: 0.2836 Epoch [600/1000], Loss: 0.7352 Epoch [610/1000], Loss: 0.4651 Epoch [620/1000], Loss: 0.3171 Epoch [630/1000], Loss: 0.2677 Epoch [640/1000], Loss: 0.3188 Epoch [650/1000], Loss: 0.2114 Epoch [660/1000], Loss: 0.1836 Epoch [670/1000], Loss: 0.1695 Epoch [680/1000], Loss: 0.1591 Epoch [690/1000], Loss: 0.1516 Epoch [700/1000], Loss: 0.1453 Epoch [710/1000], Loss: 0.1394 Epoch [720/1000], Loss: 0.1344 Epoch [730/1000], Loss: 0.1294 Epoch [740/1000], Loss: 0.1246 Epoch [750/1000], Loss: 0.1200 Epoch [760/1000], Loss: 0.1155 Epoch [770/1000], Loss: 0.1115 Epoch [780/1000], Loss: 0.1078 Epoch [790/1000], Loss: 0.1041 Epoch [800/1000], Loss: 0.1009 Epoch [810/1000], Loss: 0.0980 Epoch [820/1000], Loss: 0.0953 Epoch [830/1000], Loss: 0.0927 Epoch [840/1000], Loss: 0.0901 Epoch [850/1000], Loss: 0.0876 Epoch [860/1000], Loss: 0.0851 Epoch [870/1000], Loss: 0.0820 Epoch [880/1000], Loss: 0.0793 Epoch [890/1000], Loss: 0.0766 Epoch [900/1000], Loss: 0.0739 Epoch [910/1000], Loss: 0.0712 Epoch [920/1000], Loss: 0.0688 Epoch [930/1000], Loss: 0.0664 Epoch [940/1000], Loss: 0.0641 Epoch [950/1000], Loss: 0.0619 Epoch [960/1000], Loss: 0.0597 Epoch [970/1000], Loss: 0.0575 Epoch [980/1000], Loss: 0.0554 Epoch [990/1000], Loss: 0.0533 Epoch [1000/1000], Loss: 0.0513
with torch.no_grad():
pre_data = data.drop("투자 여부", axis=1)
last_14_days = pre_data.iloc[0:5].values
last_14_days_tensor = torch.tensor(last_14_days, dtype=torch.float32).unsqueeze(0)
reconstructed, latent = autoencoder(last_14_days_tensor)
predicted_future_values = predictor(latent)
predicted_class = torch.argmax(predicted_future_values, dim=1).item()
print(predicted_class)
1
torch.save(autoencoder.state_dict(), "autoencoder.pt")
torch.save(predictor.state_dict(), "predictor.pt")
실시간 당일 주가 정보들을 바탕으로 투자 여부 예측 (테스트)
autoencoder.load_state_dict(torch.load("autoencoder.pt"))
predictor.load_state_dict(torch.load("predictor.pt"))
autoencoder.eval()
predictor.eval()
/tmp/ipykernel_1426/2871261793.py:1: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
autoencoder.load_state_dict(torch.load("autoencoder.pt"))
/tmp/ipykernel_1426/2871261793.py:2: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
predictor.load_state_dict(torch.load("predictor.pt"))
FuturePredictor( (lstm): LSTM(35, 64, batch_first=True) (fc): Linear(in_features=64, out_features=4, bias=True) )
from datetime import datetime
today = datetime.today().strftime('%Y-%m-%d')
today_data = pd.read_csv(f"{today}_trdata.csv")
today_data["날짜"] = pd.to_datetime(today_data["날짜"])
processing_today_data = merge_preprocessing(today_data)
/tmp/ipykernel_1426/2289762405.py:26: FutureWarning: DataFrame.fillna with 'method' is deprecated and will raise in a future version. Use obj.ffill() or obj.bfill() instead.
standard_data.fillna(method="ffill", inplace=True)
/tmp/ipykernel_1426/2289762405.py:33: FutureWarning: DataFrame.applymap has been deprecated. Use DataFrame.map instead.
standard_data[percent_columns] = standard_data[percent_columns].applymap(convert_percent)
/tmp/ipykernel_1426/2289762405.py:36: FutureWarning: DataFrame.applymap has been deprecated. Use DataFrame.map instead.
standard_data = standard_data.applymap(lambda x: float(x.replace(",", "")) if isinstance(x, str) else x)
processing_today_data = scale_data(processing_today_data)
/home/dst78/anaconda3/envs/Deep/lib/python3.11/site-packages/sklearn/base.py:458: UserWarning: X has feature names, but StandardScaler was fitted without feature names warnings.warn(
processing_today_data
| 날짜 | 변동성지수_종가 | 변동성지수_시가 | 변동성지수_고가 | 변동성지수_저가 | 변동성지수_변동 % | 미국10년국채_종가 | 미국10년국채_시가 | 미국10년국채_고가 | 미국10년국채_저가 | ... | 코스닥_종가 | 코스닥_시가 | 코스닥_고가 | 코스닥_저가 | 코스닥_변동 % | 다우존스_종가 | 다우존스_시가 | 다우존스_고가 | 다우존스_저가 | 다우존스_변동 % | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2025-02-26 | -0.215555 | -0.279948 | -0.122741 | -0.187327 | 0.251264 | 1.115136 | 1.186087 | 1.151617 | 1.141879 | ... | -0.577081 | -0.604755 | -0.61126 | -0.555613 | 0.152306 | 1.940698 | 1.913117 | 1.937721 | 1.902119 | 0.25768 |
1 rows × 36 columns
processing_today_data.set_index("날짜", inplace=True)
processing_today_data = np.array(processing_today_data)
processing_today_data
array([[-0.21555526, -0.27994777, -0.12274103, -0.18732721, 0.25126363,
1.11513572, 1.18608685, 1.15161722, 1.14187937, -0.6523526 ,
1.90130199, 1.96884296, 1.94014396, 1.88470489, -0.9071739 ,
1.79514292, 1.77923581, 1.7189048 , 1.82818019, -0.09019509,
0.1469806 , 0.10849754, 0.11955455, 0.13288753, 0.31966724,
-0.57708119, -0.60475547, -0.61126023, -0.55561324, 0.15230583,
1.94069812, 1.91311688, 1.9377209 , 1.90211913, 0.25767998]])
with torch.no_grad():
processing_today_data = processing_today_data[0:1]
predict_data = torch.tensor(processing_today_data, dtype=torch.float32).unsqueeze(0)
reconstructed_pred, latent_pred = autoencoder(predict_data)
predicted_real = predictor(latent_pred)
predicted_class_real = torch.argmax(predicted_real, dim=1).item()
print("예측된 클래스:", predicted_class_real)
예측된 클래스: 1