【Python】LSTMによる株価予測【Chainer】

AI
スポンサーリンク

はじめに

Python、ChainerによるLSTMにて日経平均株価の予測を試みる。
Chainerを使用しているのは、過去に実験的に作ったLSTMがChainerを使用しており、それを使いまわしているため。

発端

うちの嫁に、
「株価予測するアプリ作れ」って脅された。

そもそも、それほど株取引の知識が無いのですけど。

とりあえず、それっぽいものを作るべくLSTMで実験した。

結果

ダメでした。
敗因は以下。

  • 株価自体が何かルールを持っているわけではない
    • 外乱の塊をそのまま予測することは不可能。
  • データ量が多ければ良いという感じでもない。
    • 株価自体が基本的に緩やかな上昇をしているので、データ量を増やすと局所的な特徴が消え、緩やかな上昇という特徴だけが際立つ
  • 直近の予測が新型コロナの影響を受けすぎていて、なんか無理。
    • 明らかな定性がある場合は、過去の定量など意味がない。

予測結果

訓練データで使用した方はうまく予測している。
というか知っている波形なので予測と言ってよいのか・・・。

テストデータの方は全くだめ。
新型コロナの影響が大きすぎて、過去データが全く通用しない。

以下、技術的解説

LSTMとは

長・短期記憶(ちょう・たんききおく、英: Long short-term memory、略称: LSTM)は、深層学習(ディープラーニング)の分野において用いられる人工回帰型ニューラルネットワーク(RNN)アーキテクチャである。標準的な順伝播型ニューラルネットワーク(英語版)とは異なり、LSTMは自身を「汎用計算機」(すなわち、チューリングマシンが計算可能なことを何でも計算できる)にするフィードバック結合を有する。LSTMは(画像といった)単一のデータ点だけでなく、(音声あるいは動画といった)全データ配列を処理できる。例えば、LSTMは分割されていない、つながった手書き文字認識や音声認識といった課題に適用可能である。ブルームバーグ ビジネスウィーク誌は「これらの力がLSTMを、病気の予測から作曲まで全てに使われる、ほぼ間違いなく最も商業的なAIの成果としている」と書いた。

Wikipediaより

一言で言うと、
「系列データの予測が得意なRNNの拡張版で”勾配消失問題”、”将来に於いては関係があるデータの扱いの問題”の対策を入れたもの」
になる。

RNN、LSTMについては以下の記事でもう少し詳しく説明しているので、よろしければご参考に。

LSTMに学習させる際の注意点

LSTMは系列データの予測に強いと言っても、何でもOKというわけではない。
LSTMの構造上の都合で、以下の制約がある。

  • ON/OFFスイッチ(シグモイド都合)
  • プラス/マイナスの方向切替スイッチ(tanh都合)

よって、

こういうのは常にプラス方向なので、LSTMの表現力が半減未満になる。

こういった感じの0を中心とした波形の方が得意
さらに1.0未満だと尚良い。

データ加工方針

前項の理由により、そのまま株価を突っ込んでもLSTMの予測能力は期待できない。
よって、以下の方針でデータ加工する。

  • 株価波形を微分する
  • 微分した波形を±1.0に収まるように正規化する
  • LSTMに学習&予測させる
  • LSTMが予測した波形を不定積分する
  • 不定積分した際に出てくる積分定数は予測直前のデータとする。

微分、積分について

積分、微分というキーワードを使ったが、やることは、引き算と足し算。

微分(差分法)
$$f(t)d/dt≒f(t)-f(t-1)≒f(t+1)-f(t)$$

積分(総和法)
\(C:\)積分定数
$$∫f(t)dt≒\displaystyle\sum_{i=1}^n f(t_i)+C$$

株価データ

日経平均株価を使用。

Nikkei 225 Index - 67 Year Historical Chart
Interactive daily chart of Japan's Nikkei 225 stock market index back to 1949. Each data point represents the closing value for that trading day and is denomina...

上記から、csvを入手できる。

上記のように15行分のヘッダが居るので読み飛ばす必要はある。

学習方針

取得した日経平均株価は1949年から現在までのデータが揃っている。
本来であれば、すべてのデータを使用した方が良いように感じるが、人類の経済成長と言う謎の定性的要因に引っ張られるので、2019年と2020年のデータだけを使用する。

2019年を訓練データ
2020年1月~4月をテストデータ
とする。

訓練データとミニバッチ学習

2019年のデータをランダムに10個の60日分データを使用した学習を1エポックとし、
1000、2000、3000のエポック毎で効果を確認する。

予測値の出し方

予測させるのは、株価ではなく、それの微分値こと偏差。
赤線が真値、青線が予測値

微分値を積分こと、総和して、元の株価に戻す。
こちらも、赤線が真値、青線が予測値

予測値

以下にエポックが1000、2000、3000の時の訓練データに対する予測と、テストデータに対する予測を並べる。

1000エポック

訓練データに対してはなんとなく追従している感じ。
7月~10月が上手くない感じだが。
テストデータにはまったく追従できず。

2000エポック

訓練データは、1000エポックよりも追従性が良くなった。
相変わらずテストデータには追従できず。

3000エポック

訓練データは、さらに良くなった。
そして、どうしても追従しないテストデータ。

なぜ、下落が止まり上昇方向の予測をするか考察

まず、訓練データの中に19,500円を下回るデータが無かった。
そして、訓練データの中で似たような波形があり、これを模そうと頑張っていたと思われる。

予測は失敗だが、
訓練は成功していると言える。
たぶん。

実験コード

import datetime
import numpy as np
import matplotlib.pylab as plt
from chainer import Chain, Variable, cuda, optimizer, optimizers, serializers
import chainer.functions as F
import chainer.links as L
import pandas as pd


# モデルクラス定義
 
class LSTM(Chain):
    def __init__(self, in_size, hidden_size, out_size):
        super(LSTM, self).__init__(
            xh = L.Linear(in_size, hidden_size),
            hh = L.LSTM(hidden_size, hidden_size),
            hy = L.Linear(hidden_size, out_size)
        )
 
    def __call__(self, x, t=None, train=False):
        x = Variable(x)
        if train:
            t = Variable(t)
        h = self.xh(x)
        h = self.hh(h)
        y = self.hy(h)
        if train:
            return F.mean_squared_error(y, t)
        else:
            return y.data
 
    def reset(self):
        self.zerograds()
        self.hh.reset_state()

EPOCH_NUM = 1000
IN_SIZE=1
HIDDEN_SIZE = 60
OUT_SIZE=1
BATCH_NUM = 10 # 分割したデータをいくつミニバッチに取り込むか
BATCH_SIZE = 60 # ミニバッチで分割する時系列数


# 学習
def Training(str="",wait_load=False):
    print("\nTraining\n")
    
    # 教師データ
    df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
    mat = df.query('date.str.match('+str+')', engine='python')
    train_data_t = mat[' value'].values
    print(train_data_t)
    
    train_data = np.arange(len(train_data_t), dtype="float32");
    
    # 微分(偏差)データに変換
    for i in range(len(train_data_t)-1):
        train_data[i] = train_data_t[i+1]-train_data_t[i]
    
    # 正規化用ゲイン
    gain = np.max(train_data)-np.min(train_data)
    gain = gain/2
    
    train_data = train_data/gain	# ±1.0以内に
    
    # 入力データと教師データを作成
    train_x, train_t = [], []
    for i in range(len(train_data)-1):
        train_x.append(train_data[i])
        train_t.append(train_data[i+1])
    train_x = np.array(train_x, dtype="float32")
    train_t = np.array(train_t, dtype="float32")
    Num = len(train_x)
     
    # モデル定義
    model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
    optimizer = optimizers.Adam()
    optimizer.setup(model)
     
    if wait_load:
        serializers.load_npz("mymodel.npz", model)
    
    # 学習開始
    print("Train")
    st = datetime.datetime.now()	# 100エポック時間計測用に記録
    for epoch in range(EPOCH_NUM):
     
        # ミニバッチ学習
        x, t = [], []
        for i in range(BATCH_NUM):
            # ランダムな箇所、ただしBATCH_SIZE分だけ抜き取れる場所から選ぶ
            # (indexの末端がBATCH_SIZEを超えない部分でリミットを掛ける)
            index = np.random.randint(0, Num-BATCH_SIZE+1) 
            x.append(train_x[index:index+BATCH_SIZE]) # BATCH_SIZE分の入力データを取り出す
            t.append(train_t[index:index+BATCH_SIZE]) # BATCH_SIZE分の教師データを取り出す
        
        # NumPy配列に変換
        x = np.array(x, dtype="float32")
        t = np.array(t, dtype="float32")
        loss = 0
        total_loss = 0
        model.reset() 
        for i in range(BATCH_SIZE): # 各時刻おきにBATCH_NUMごと読み込んで損失を計算する
            x_ = np.array([x[j, i] for j in range(BATCH_NUM)], dtype="float32")[:, np.newaxis] # 時刻iの入力値
            t_ = np.array([t[j, i] for j in range(BATCH_NUM)], dtype="float32")[:, np.newaxis] # 時刻i+1の値(=正解の予測値)
            loss += model(x=x_, t=t_, train=True) # 誤差合計
        loss.backward() # 誤差逆伝播
        loss.unchain_backward() 
        total_loss += loss.data
        optimizer.update()
        if (epoch+1) % 100 == 0:
            # 100エポック毎に誤差と学習時間を表示
            ed = datetime.datetime.now()
            print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
            st = datetime.datetime.now()	# 100エポック時間計測用に記録
     
    serializers.save_npz("mymodel.npz", model) # npz形式で書き出し
     
# 予測能力評価
def Predict(str):
    print("\nPredict")
    # モデルの定義
    model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
    
    # 重みロード
    serializers.load_npz("mymodel.npz", model)
    
    # 予測元データロード
    df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
    mat = df.query('date.str.match('+str+')', engine='python')
    train_data_t = mat[' value'].values
    
    train_data = np.arange(len(train_data_t), dtype="float32");
    
    print(train_data_t)
    
    # 偏差算出(微分)
    for i in range(len(train_data_t)-1):
        train_data[i] = train_data_t[i+1]-train_data_t[i]
    
    # 正規化用ゲイン
    gain = np.max(train_data)-np.min(train_data)
    gain = gain/2
    
    train_data = train_data/gain	# ±1.0以内に
    Num = len(train_data)-1
    
    print(train_data)
    
    predict = np.empty(0) # 予測値格納用
    predict_size = 30     # 予測サイズ
    predata_size = len(train_data)-predict_size # 予測直前までのデータ数
    indata = train_data[1:predata_size] # 予測直前までのデータ
    for _ in range(predata_size):
        model.reset()
        for i in indata: # モデルに予測直前までの時系列を読み込ませる
            x = np.array([[i]], dtype="float32")
            y = model(x=x, train=False)
        predict = np.append(predict, y) # 最後の予測値を記録
        # モデルに読み込ませる予測直前時系列を予測値で更新する
        indata = np.delete(indata, 0)
        indata = np.append(indata, y)
    
    plt.plot(range(Num+1), train_data, color="red", label="t")
    plt.plot(range(predata_size, predata_size+predict_size-1), predict[0:predict_size-1], "--.", label="y")
    plt.legend(loc="upper left")
    plt.show()

    predict = predict * gain	# 元データと同じ割合で予測値を拡大
    ipredict = np.arange(len(predict)+1, dtype="float32")
    predict_tmp = train_data_t[predata_size]; # 初期値(積分定数)
    ipredict[0]=predict_tmp
    # 積分
    for i in range(len(predict)):
        predict_tmp = predict_tmp + predict[i]
        ipredict[i+1] = predict_tmp
    
    plt.plot(range(Num+1), train_data_t, color="red", label="t")
    plt.plot(range(predata_size, predata_size+predict_size-1), ipredict[0:predict_size-1], "--.", label="y")
    plt.show()

# 本当に将来を予測
def Predict2(str="",tail=90):
    print("\nPredict2")
    # モデルの定義
    model = LSTM(in_size=IN_SIZE, hidden_size=HIDDEN_SIZE, out_size=OUT_SIZE)
    
    # 重みロード
    serializers.load_npz("mymodel.npz", model)
    
    # 予測元データロード
    df = pd.read_csv('nikkei-225-index-historical-chart-data.csv',header=8)
    
    if not str:
        # 空文字列の場合は貯金tail日分のデータを使用
        mat = df.tail(tail)
    else:
        # strを正規表現としてデータ抽出
        mat = df.query('date.str.match('+str+')', engine='python')
    
    train_data_t = mat[' value'].values
    
    train_data = np.arange(len(train_data_t), dtype="float32");
    
    
    for i in range(len(train_data_t)-1):
        train_data[i] = train_data_t[i+1]-train_data_t[i]
    
    gain = np.max(train_data)-np.min(train_data)
    gain = gain/2
    
    train_data = train_data/gain
    Num = len(train_data)-1
    
    print(train_data)
    
    predict = np.empty(0) # 予測値格納用
    predict_size = 30     # 予測サイズ
    indata = train_data # 予測直前までの時系列
    for _ in range(predict_size):
        model.reset() # メモリを初期化
        for i in indata: # モデルに予測直前までのデータを読み込ませる
            x = np.array([[i]], dtype="float32")
            y = model(x=x, train=False)
        predict = np.append(predict, y) # 最後の予測値を記録
        # モデルに読み込ませる予測直前データを予測値で更新する
        indata = np.delete(indata, 0)
        indata = np.append(indata, y)
    
    plt.plot(range(Num+1), train_data, color="red", label="t")
    plt.plot(range(Num, Num+predict_size), predict[0:predict_size], "--.", label="y")
    plt.legend(loc="upper left")
    plt.show()

    predict = predict * gain
    ipredict = np.arange(len(predict)+1, dtype="float32")
    predict_tmp = train_data_t[Num];
    ipredict[0]=predict_tmp
    # 積分
    for i in range(len(predict)):
        predict_tmp = predict_tmp + predict[i]
        ipredict[i+1] = predict_tmp
    
    plt.plot(range(Num+1), train_data_t, color="red", label="t")
    plt.plot(range(Num, Num+predict_size), ipredict[0:predict_size], "--.", label="y")
    plt.show()

Training(str="\"^(2019-)\"",wait_load=False)
Predict(str="\"^2019-((01)|(02)|(03)|(04))\"")	# 訓練データ内で予測
Predict(str="\"^2019-((04)|(05)|(06)|(07))\"")	# 訓練データ内で予測
Predict(str="\"^2019-((07)|(08)|(09)|(10))\"")	# 訓練データ内で予測
Predict(str="\"^2019-((09)|(10)|(11)|(12))\"")	# 訓練データ内で予測
Predict(str="\"^2020-((01)|(02)|(03)|(04))\"")	# 訓練データ外(テストデータ)で予測
#Predict2()		#本当に未来を予測

まとめ

  • LSTMは株価予測が出来ないわけではないが、定量性を破壊する定性的要因は流石に適応できない。
    • 平時であれば、もう少しいい感じに予測できたかも。
  • 単純にデータを入れれば良いということは無く、NNの特性から逆算して学習データを加工する必要がある。
    • 今回のように微分したり、正規化したり。
    • その他:標準化、白色化(無相関化+標準化)
  • 直近のデータだけで学習して、局所的な変動を見る方針にすると、また違った効能が見えるかもしれない。
    • 人間が見つけられない特徴を拾ってくれる可能性あり。
  • 今回は1次元入力、1次元出力の構成にしたが、株価に影響のある多変量データがあれば、それを一緒に入力すれば精度の高い予測ができる可能性あり。
  • 嫁には「ふーん」という扱いを受けた。(がんばったのにぃ!)

※ 正規化、標準化、白色化についてはこちらで説明

コメント

タイトルとURLをコピーしました