【Python】LSTMによる株価予測【Chainer】

【Python】LSTMによる株価予測【Chainer】 AI、データサイエンス

実験コード

import datetime
import numpy as np
import matplotlib.pylab as plt
from chainer import Chain, Variable, cuda, optimizer, optimizers, serializers
import chainer.functions as F
import chainer.links as L
import pandas as pd


# モデルクラス定義
 
class LSTM(Chain):
    def __init__(self, in_size, hidden_size, out_size):
        super(LSTM, self).__init__(
            xh = L.Linear(in_size, hidden_size),
            hh = L.LSTM(hidden_size, hidden_size),
            hy = L.Linear(hidden_size, out_size)
        )
 
    def __call__(self, x, t=None, train=False):
        x = Variable(x)
        if train:
            t = Variable(t)
        h = self.xh(x)
        h = self.hh(h)
        y = self.hy(h)
        if train:
            return F.mean_squared_error(y, t)
        else:
            return y.data
 
    def reset(self):
        self.zerograds()
        self.hh.reset_state()

EPOCH_NUM = 1000
IN_SIZE=1
HIDDEN_SIZE = 60
OUT_SIZE=1
BATCH_NUM = 10 # 分割したデータをいくつミニバッチに取り込むか
BATCH_SIZE = 60 # ミニバッチで分割する時系列数


# 学習
def Training(str="",wait_load=False):
    print("\nTraining\n")
    
    # 教師データ
    df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
    mat = df.query('date.str.match('+str+')', engine='python')
    train_data_t = mat[' value'].values
    print(train_data_t)
    
    train_data = np.arange(len(train_data_t), dtype="float32");
    
    # 微分(偏差)データに変換
    for i in range(len(train_data_t)-1):
        train_data[i] = train_data_t[i+1]-train_data_t[i]
    
    # 正規化用ゲイン
    gain = np.max(train_data)-np.min(train_data)
    gain = gain/2
    
    train_data = train_data/gain	# ±1.0以内に
    
    # 入力データと教師データを作成
    train_x, train_t = [], []
    for i in range(len(train_data)-1):
        train_x.append(train_data[i])
        train_t.append(train_data[i+1])
    train_x = np.array(train_x, dtype="float32")
    train_t = np.array(train_t, dtype="float32")
    Num = len(train_x)
     
    # モデル定義
    model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
    optimizer = optimizers.Adam()
    optimizer.setup(model)
     
    if wait_load:
        serializers.load_npz("mymodel.npz", model)
    
    # 学習開始
    print("Train")
    st = datetime.datetime.now()	# 100エポック時間計測用に記録
    for epoch in range(EPOCH_NUM):
     
        # ミニバッチ学習
        x, t = [], []
        for i in range(BATCH_NUM):
            # ランダムな箇所、ただしBATCH_SIZE分だけ抜き取れる場所から選ぶ
            # (indexの末端がBATCH_SIZEを超えない部分でリミットを掛ける)
            index = np.random.randint(0, Num-BATCH_SIZE+1) 
            x.append(train_x[index:index+BATCH_SIZE]) # BATCH_SIZE分の入力データを取り出す
            t.append(train_t[index:index+BATCH_SIZE]) # BATCH_SIZE分の教師データを取り出す
        
        # NumPy配列に変換
        x = np.array(x, dtype="float32")
        t = np.array(t, dtype="float32")
        loss = 0
        total_loss = 0
        model.reset() 
        for i in range(BATCH_SIZE): # 各時刻おきにBATCH_NUMごと読み込んで損失を計算する
            x_ = np.array([x[j, i] for j in range(BATCH_NUM)], dtype="float32")[:, np.newaxis] # 時刻iの入力値
            t_ = np.array([t[j, i] for j in range(BATCH_NUM)], dtype="float32")[:, np.newaxis] # 時刻i+1の値(=正解の予測値)
            loss += model(x=x_, t=t_, train=True) # 誤差合計
        loss.backward() # 誤差逆伝播
        loss.unchain_backward() 
        total_loss += loss.data
        optimizer.update()
        if (epoch+1) % 100 == 0:
            # 100エポック毎に誤差と学習時間を表示
            ed = datetime.datetime.now()
            print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
            st = datetime.datetime.now()	# 100エポック時間計測用に記録
     
    serializers.save_npz("mymodel.npz", model) # npz形式で書き出し
     
# 予測能力評価
def Predict(str):
    print("\nPredict")
    # モデルの定義
    model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
    
    # 重みロード
    serializers.load_npz("mymodel.npz", model)
    
    # 予測元データロード
    df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
    mat = df.query('date.str.match('+str+')', engine='python')
    train_data_t = mat[' value'].values
    
    train_data = np.arange(len(train_data_t), dtype="float32");
    
    print(train_data_t)
    
    # 偏差算出(微分)
    for i in range(len(train_data_t)-1):
        train_data[i] = train_data_t[i+1]-train_data_t[i]
    
    # 正規化用ゲイン
    gain = np.max(train_data)-np.min(train_data)
    gain = gain/2
    
    train_data = train_data/gain	# ±1.0以内に
    Num = len(train_data)-1
    
    print(train_data)
    
    predict = np.empty(0) # 予測値格納用
    predict_size = 30     # 予測サイズ
    predata_size = len(train_data)-predict_size # 予測直前までのデータ数
    indata = train_data[1:predata_size] # 予測直前までのデータ
    for _ in range(predata_size):
        model.reset()
        for i in indata: # モデルに予測直前までの時系列を読み込ませる
            x = np.array([[i]], dtype="float32")
            y = model(x=x, train=False)
        predict = np.append(predict, y) # 最後の予測値を記録
        # モデルに読み込ませる予測直前時系列を予測値で更新する
        indata = np.delete(indata, 0)
        indata = np.append(indata, y)
    
    plt.plot(range(Num+1), train_data, color="red", label="t")
    plt.plot(range(predata_size, predata_size+predict_size-1), predict[0:predict_size-1], "--.", label="y")
    plt.legend(loc="upper left")
    plt.show()

    predict = predict * gain	# 元データと同じ割合で予測値を拡大
    ipredict = np.arange(len(predict)+1, dtype="float32")
    predict_tmp = train_data_t[predata_size]; # 初期値(積分定数)
    ipredict[0]=predict_tmp
    # 積分
    for i in range(len(predict)):
        predict_tmp = predict_tmp + predict[i]
        ipredict[i+1] = predict_tmp
    
    plt.plot(range(Num+1), train_data_t, color="red", label="t")
    plt.plot(range(predata_size, predata_size+predict_size-1), ipredict[0:predict_size-1], "--.", label="y")
    plt.show()

# 本当に将来を予測
def Predict2(str="",tail=90):
    print("\nPredict2")
    # モデルの定義
    model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
    
    # 重みロード
    serializers.load_npz("mymodel.npz", model)
    
    # 予測元データロード
    df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
    
    if not str:
        # 空文字列の場合は貯金tail日分のデータを使用
        mat = df.tail(tail)
    else:
        # strを正規表現としてデータ抽出
        mat = df.query('date.str.match('+str+')', engine='python')
    
    train_data_t = mat[' value'].values
    
    train_data = np.arange(len(train_data_t), dtype="float32");
    
    
    for i in range(len(train_data_t)-1):
        train_data[i] = train_data_t[i+1]-train_data_t[i]
    
    gain = np.max(train_data)-np.min(train_data)
    gain = gain/2
    
    train_data = train_data/gain
    Num = len(train_data)-1
    
    print(train_data)
    
    predict = np.empty(0) # 予測値格納用
    predict_size = 30     # 予測サイズ
    indata = train_data # 予測直前までの時系列
    for _ in range(predict_size):
        model.reset() # メモリを初期化
        for i in indata: # モデルに予測直前までのデータを読み込ませる
            x = np.array([[i]], dtype="float32")
            y = model(x=x, train=False)
        predict = np.append(predict, y) # 最後の予測値を記録
        # モデルに読み込ませる予測直前データを予測値で更新する
        indata = np.delete(indata, 0)
        indata = np.append(indata, y)
    
    plt.plot(range(Num+1), train_data, color="red", label="t")
    plt.plot(range(Num, Num+predict_size), predict[0:predict_size], "--.", label="y")
    plt.legend(loc="upper left")
    plt.show()

    predict = predict * gain
    ipredict = np.arange(len(predict)+1, dtype="float32")
    predict_tmp = train_data_t[Num];
    ipredict[0]=predict_tmp
    # 積分
    for i in range(len(predict)):
        predict_tmp = predict_tmp + predict[i]
        ipredict[i+1] = predict_tmp
    
    plt.plot(range(Num+1), train_data_t, color="red", label="t")
    plt.plot(range(Num, Num+predict_size), ipredict[0:predict_size], "--.", label="y")
    plt.show()

Training(str="\"^(2019-)\"",wait_load=False)
Predict(str="\"^2019-((01)|(02)|(03)|(04))\"")	# 訓練データ内で予測
Predict(str="\"^2019-((04)|(05)|(06)|(07))\"")	# 訓練データ内で予測
Predict(str="\"^2019-((07)|(08)|(09)|(10))\"")	# 訓練データ内で予測
Predict(str="\"^2019-((09)|(10)|(11)|(12))\"")	# 訓練データ内で予測
Predict(str="\"^2020-((01)|(02)|(03)|(04))\"")	# 訓練データ外(テストデータ)で予測
#Predict2()		#本当に未来を予測

Github

同一のものをGithubにも上げてます。

まとめ

  • LSTMは株価予測が出来ないわけではないが、定量性を破壊する定性的要因は流石に適応できない。
    • 平時であれば、もう少しいい感じに予測できたかも。
  • 単純にデータを入れれば良いということは無く、NNの特性から逆算して学習データを加工する必要がある。
    • 今回のように微分したり、正規化したり。
    • その他:標準化、白色化(無相関化+標準化)
  • 直近のデータだけで学習して、局所的な変動を見る方針にすると、また違った効能が見えるかもしれない。
    • 人間が見つけられない特徴を拾ってくれる可能性あり。
  • 今回は1次元入力、1次元出力の構成にしたが、株価に影響のある多変量データがあれば、それを一緒に入力すれば精度の高い予測ができる可能性あり。
  • 嫁には「ふーん」という扱いを受けた。(がんばったのにぃ!)

※ 正規化、標準化、白色化についてはこちらで説明





コメント

タイトルとURLをコピーしました