日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人工智能 > 循环神经网络 >内容正文

循环神经网络

循环神经网络RNN、LSTM、GRU实现股票预测

發(fā)布時間:2024/1/18 循环神经网络 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 循环神经网络RNN、LSTM、GRU实现股票预测 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Tensorflow——循環(huán)神經(jīng)網(wǎng)絡(luò)RNN

  • 循環(huán)核
    • TensorFlow描述循環(huán)核
  • 循環(huán)神經(jīng)網(wǎng)絡(luò)
    • TensorFlow描述循環(huán)神經(jīng)網(wǎng)絡(luò)
  • 循環(huán)計算過程
    • 輸入一個字母,預(yù)測下一個字母
    • 輸入四個連續(xù)字母,預(yù)測下一個字母
  • Embedding編碼
      • TensorFlow描述Embedding編碼
      • 用Embedding編碼替換獨熱碼實現(xiàn)輸入一個字符預(yù)測
      • 用Embedding編碼替換獨熱碼實現(xiàn)輸入四個字符預(yù)測
  • 循環(huán)神經(jīng)網(wǎng)絡(luò)實現(xiàn)股票預(yù)測
    • 下載股票預(yù)測數(shù)據(jù)
    • 股票預(yù)測
  • 長短記憶網(wǎng)絡(luò)LSTM
    • TensorFlow描述LSTM層
    • LSTM實現(xiàn)股票預(yù)測
  • GRU網(wǎng)絡(luò)
    • TensorFlow描述GRU層
    • LSTM實現(xiàn)股票預(yù)測

觀看【北京大學(xué)】TensorFlow2.0視頻 筆記
https://www.bilibili.com/video/BV1B7411L7Qt?p=25

循環(huán)核

循環(huán)核具有記憶力,通過不同時刻的參數(shù)共享實現(xiàn)了對時間序列的信息提取

多層循環(huán)核

TensorFlow描述循環(huán)核

tf.keras.layers.SimpleRNN( 記憶體個數(shù), activation='', #默認(rèn)tanh return_sequences=True or False #是否每個時刻都輸出ht到下一層默認(rèn)true

循環(huán)神經(jīng)網(wǎng)絡(luò)

借助循環(huán)核提取時間特征后,送入全連接網(wǎng)絡(luò)

TensorFlow描述循環(huán)神經(jīng)網(wǎng)絡(luò)

tf.keras.layers.SimpleRNN( 記憶體個數(shù), activation='', #默認(rèn)tanh return_sequences=True or False #是否每個時刻都輸出ht到下一層默認(rèn)true

循環(huán)計算過程

輸入一個字母,預(yù)測下一個字母

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN import matplotlib.pyplot as plt import osinput_word = "abcde" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4} # 單詞映射到數(shù)值id的詞典 id_to_onehot = {0: [1., 0., 0., 0., 0.], 1: [0., 1., 0., 0., 0.], 2: [0., 0., 1., 0., 0.], 3: [0., 0., 0., 1., 0.],4: [0., 0., 0., 0., 1.]} # id編碼為one-hotx_train = [id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']],id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']]] y_train = [w_to_id['b'], w_to_id['c'], w_to_id['d'], w_to_id['e'], w_to_id['a']]np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合SimpleRNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。 # 此處整個數(shù)據(jù)集送入,送入樣本數(shù)為len(x_train);輸入1個字母出結(jié)果,循環(huán)核時間展開步數(shù)為1; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數(shù)為5 x_train = np.reshape(x_train, (len(x_train), 1, 5)) y_train = np.array(y_train)model = tf.keras.Sequential([SimpleRNN(3),Dense(5, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/rnn_onehot_1pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準(zhǔn)確率,根據(jù)loss,保存最優(yōu)模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()# print(model.trainable_variables) file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓(xùn)練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()############### predict #############preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [id_to_onehot[w_to_id[alphabet1]]]# 使alphabet符合SimpleRNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。此處驗證效果送入了1個樣本,送入樣本數(shù)為1;輸入1個字母出結(jié)果,所以循環(huán)核時間展開步數(shù)為1; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數(shù)為5alphabet = np.reshape(alphabet, (1, 1, 5))result = model.predict([alphabet])pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])

輸入四個連續(xù)字母,預(yù)測下一個字母

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN import matplotlib.pyplot as plt import osinput_word = "abcde" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4} # 單詞映射到數(shù)值id的詞典 id_to_onehot = {0: [1., 0., 0., 0., 0.], 1: [0., 1., 0., 0., 0.], 2: [0., 0., 1., 0., 0.], 3: [0., 0., 0., 1., 0.],4: [0., 0., 0., 0., 1.]} # id編碼為one-hotx_train = [[id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']], id_to_onehot[w_to_id['d']]],[id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']], id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']]],[id_to_onehot[w_to_id['c']], id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']], id_to_onehot[w_to_id['a']]],[id_to_onehot[w_to_id['d']], id_to_onehot[w_to_id['e']], id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']]],[id_to_onehot[w_to_id['e']], id_to_onehot[w_to_id['a']], id_to_onehot[w_to_id['b']], id_to_onehot[w_to_id['c']]], ] y_train = [w_to_id['e'], w_to_id['a'], w_to_id['b'], w_to_id['c'], w_to_id['d']]np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合SimpleRNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。 # 此處整個數(shù)據(jù)集送入,送入樣本數(shù)為len(x_train);輸入4個字母出結(jié)果,循環(huán)核時間展開步數(shù)為4; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數(shù)為5 x_train = np.reshape(x_train, (len(x_train), 4, 5)) y_train = np.array(y_train)model = tf.keras.Sequential([SimpleRNN(3),Dense(5, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/rnn_onehot_4pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準(zhǔn)確率,根據(jù)loss,保存最優(yōu)模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()# print(model.trainable_variables) file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓(xùn)練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()############### predict #############preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [id_to_onehot[w_to_id[a]] for a in alphabet1]# 使alphabet符合SimpleRNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。此處驗證效果送入了1個樣本,送入樣本數(shù)為1;輸入4個字母出結(jié)果,所以循環(huán)核時間展開步數(shù)為4; 表示為獨熱碼有5個輸入特征,每個時間步輸入特征個數(shù)為5alphabet = np.reshape(alphabet, (1, 4, 5))result = model.predict([alphabet])pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])

Embedding編碼

Embedding是一種單詞編碼,用低維向量實現(xiàn)了編碼,這種編碼通過神經(jīng)網(wǎng)絡(luò)訓(xùn)練優(yōu)化,能表達(dá)出單詞間的相關(guān)性

TensorFlow描述Embedding編碼

tf.keras.layers.Embedding(詞匯表大小,編碼維度) #編碼維度即用幾個數(shù)字表達(dá)一個單詞

用Embedding編碼替換獨熱碼實現(xiàn)輸入一個字符預(yù)測

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN, Embedding import matplotlib.pyplot as plt import osinput_word = "abcde" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4} # 單詞映射到數(shù)值id的詞典x_train = [w_to_id['a'], w_to_id['b'], w_to_id['c'], w_to_id['d'], w_to_id['e']] y_train = [w_to_id['b'], w_to_id['c'], w_to_id['d'], w_to_id['e'], w_to_id['a']]np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合Embedding輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù)] , # 此處整個數(shù)據(jù)集送入所以送入,送入樣本數(shù)為len(x_train);輸入1個字母出結(jié)果,循環(huán)核時間展開步數(shù)為1。 x_train = np.reshape(x_train, (len(x_train), 1)) y_train = np.array(y_train)model = tf.keras.Sequential([Embedding(5, 2),SimpleRNN(3),Dense(5, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/run_embedding_1pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準(zhǔn)確率,根據(jù)loss,保存最優(yōu)模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()# print(model.trainable_variables) file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓(xùn)練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()############### predict #############preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [w_to_id[alphabet1]]# 使alphabet符合Embedding輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù)]。# 此處驗證效果送入了1個樣本,送入樣本數(shù)為1;輸入1個字母出結(jié)果,循環(huán)核時間展開步數(shù)為1。alphabet = np.reshape(alphabet, (1, 1))result = model.predict(alphabet)pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])

用Embedding編碼替換獨熱碼實現(xiàn)輸入四個字符預(yù)測

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, SimpleRNN, Embedding import matplotlib.pyplot as plt import osinput_word = "abcdefghijklmnopqrstuvwxyz" w_to_id = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4,'f': 5, 'g': 6, 'h': 7, 'i': 8, 'j': 9,'k': 10, 'l': 11, 'm': 12, 'n': 13, 'o': 14,'p': 15, 'q': 16, 'r': 17, 's': 18, 't': 19,'u': 20, 'v': 21, 'w': 22, 'x': 23, 'y': 24, 'z': 25} # 單詞映射到數(shù)值id的詞典training_set_scaled = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,11, 12, 13, 14, 15, 16, 17, 18, 19, 20,21, 22, 23, 24, 25]x_train = [] y_train = []for i in range(4, 26):x_train.append(training_set_scaled[i - 4:i])y_train.append(training_set_scaled[i])np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7)# 使x_train符合Embedding輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù)] , # 此處整個數(shù)據(jù)集送入所以送入,送入樣本數(shù)為len(x_train);輸入4個字母出結(jié)果,循環(huán)核時間展開步數(shù)為4。 x_train = np.reshape(x_train, (len(x_train), 4)) y_train = np.array(y_train)model = tf.keras.Sequential([Embedding(26, 2),SimpleRNN(10),Dense(26, activation='softmax') ])model.compile(optimizer=tf.keras.optimizers.Adam(0.01),loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),metrics=['sparse_categorical_accuracy'])checkpoint_save_path = "./checkpoint/rnn_embedding_4pre1.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='loss') # 由于fit沒有給出測試集,不計算測試集準(zhǔn)確率,根據(jù)loss,保存最優(yōu)模型history = model.fit(x_train, y_train, batch_size=32, epochs=100, callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()############################################### show ################################################ 顯示訓(xùn)練集和驗證集的acc和loss曲線 acc = history.history['sparse_categorical_accuracy'] loss = history.history['loss']plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.title('Training Accuracy') plt.legend()plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.title('Training Loss') plt.legend() plt.show()################# predict ##################preNum = int(input("input the number of test alphabet:")) for i in range(preNum):alphabet1 = input("input test alphabet:")alphabet = [w_to_id[a] for a in alphabet1]# 使alphabet符合Embedding輸入要求:[送入樣本數(shù), 時間展開步數(shù)]。# 此處驗證效果送入了1個樣本,送入樣本數(shù)為1;輸入4個字母出結(jié)果,循環(huán)核時間展開步數(shù)為4。alphabet = np.reshape(alphabet, (1, 4))result = model.predict([alphabet])pred = tf.argmax(result, axis=1)pred = int(pred)tf.print(alphabet1 + '->' + input_word[pred])

循環(huán)神經(jīng)網(wǎng)絡(luò)實現(xiàn)股票預(yù)測

下載股票預(yù)測數(shù)據(jù)

import tushare as ts import matplotlib.pyplot as pltdf1 = ts.get_k_data('600519', ktype='D', start='2010-04-26', end='2020-04-26')datapath1 = "./SH600519.csv" df1.to_csv(datapath1)

股票預(yù)測

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dropout, Dense, SimpleRNN import matplotlib.pyplot as plt import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error import mathmaotai = pd.read_csv('./SH600519.csv') # 讀取股票文件training_set = maotai.iloc[0:2426 - 300, 2:3].values # 前(2426-300=2126)天的開盤價作為訓(xùn)練集,表格從0開始計數(shù),2:3 是提取[2:3)列,前閉后開,故提取出C列開盤價 test_set = maotai.iloc[2426 - 300:, 2:3].values # 后300天的開盤價作為測試集# 歸一化 sc = MinMaxScaler(feature_range=(0, 1)) # 定義歸一化:歸一化到(0,1)之間 training_set_scaled = sc.fit_transform(training_set) # 求得訓(xùn)練集的最大值,最小值這些訓(xùn)練集固有的屬性,并在訓(xùn)練集上進(jìn)行歸一化 test_set = sc.transform(test_set) # 利用訓(xùn)練集的屬性對測試集進(jìn)行歸一化x_train = [] y_train = []x_test = [] y_test = []# 測試集:csv表格中前2426-300=2126天數(shù)據(jù) # 利用for循環(huán),遍歷整個訓(xùn)練集,提取訓(xùn)練集中連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標(biāo)簽,for循環(huán)共構(gòu)建2426-300-60=2066組數(shù)據(jù)。 for i in range(60, len(training_set_scaled)):x_train.append(training_set_scaled[i - 60:i, 0])y_train.append(training_set_scaled[i, 0]) # 對訓(xùn)練集進(jìn)行打亂 np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7) # 將訓(xùn)練集由list格式變?yōu)閍rray格式 x_train, y_train = np.array(x_train), np.array(y_train)# 使x_train符合RNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。 # 此處整個數(shù)據(jù)集送入,送入樣本數(shù)為x_train.shape[0]即2066組數(shù)據(jù);輸入60個開盤價,預(yù)測出第61天的開盤價,循環(huán)核時間展開步數(shù)為60; 每個時間步送入的特征是某一天的開盤價,只有1個數(shù)據(jù),故每個時間步輸入特征個數(shù)為1 x_train = np.reshape(x_train, (x_train.shape[0], 60, 1)) # 測試集:csv表格中后300天數(shù)據(jù) # 利用for循環(huán),遍歷整個測試集,提取測試集中連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標(biāo)簽,for循環(huán)共構(gòu)建300-60=240組數(shù)據(jù)。 for i in range(60, len(test_set)):x_test.append(test_set[i - 60:i, 0])y_test.append(test_set[i, 0]) # 測試集變array并reshape為符合RNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)] x_test, y_test = np.array(x_test), np.array(y_test) x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))model = tf.keras.Sequential([SimpleRNN(80, return_sequences=True),Dropout(0.2),SimpleRNN(100),Dropout(0.2),Dense(1) ])model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss='mean_squared_error') # 損失函數(shù)用均方誤差 # 該應(yīng)用只觀測loss數(shù)值,不觀測準(zhǔn)確率,所以刪去metrics選項,一會在每個epoch迭代顯示時只顯示loss值checkpoint_save_path = "./checkpoint/rnn_stock.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='val_loss')history = model.fit(x_train, y_train, batch_size=64, epochs=50, validation_data=(x_test, y_test), validation_freq=1,callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()loss = history.history['loss'] val_loss = history.history['val_loss']plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()################## predict ###################### # 測試集輸入模型進(jìn)行預(yù)測 predicted_stock_price = model.predict(x_test) # 對預(yù)測數(shù)據(jù)還原---從(0,1)反歸一化到原始范圍 predicted_stock_price = sc.inverse_transform(predicted_stock_price) # 對真實數(shù)據(jù)還原---從(0,1)反歸一化到原始范圍 real_stock_price = sc.inverse_transform(test_set[60:]) # 畫出真實數(shù)據(jù)和預(yù)測數(shù)據(jù)的對比曲線 plt.plot(real_stock_price, color='red', label='MaoTai Stock Price') plt.plot(predicted_stock_price, color='blue', label='Predicted MaoTai Stock Price') plt.title('MaoTai Stock Price Prediction') plt.xlabel('Time') plt.ylabel('MaoTai Stock Price') plt.legend() plt.show()##########evaluate############## # calculate MSE 均方誤差 ---> E[(預(yù)測值-真實值)^2] (預(yù)測值減真實值求平方后求均值) mse = mean_squared_error(predicted_stock_price, real_stock_price) # calculate RMSE 均方根誤差--->sqrt[MSE] (對均方誤差開方) rmse = math.sqrt(mean_squared_error(predicted_stock_price, real_stock_price)) # calculate MAE 平均絕對誤差----->E[|預(yù)測值-真實值|](預(yù)測值減真實值求絕對值后求均值) mae = mean_absolute_error(predicted_stock_price, real_stock_price) print('均方誤差: %.6f' % mse) print('均方根誤差: %.6f' % rmse) print('平均絕對誤差: %.6f' % mae)

長短記憶網(wǎng)絡(luò)LSTM

通過門控單元改善了RNN長期依賴問題

TensorFlow描述LSTM層

tf.keras.layers.LSTM(記憶體個數(shù), return_sequences=True or False #是否返回輸出默認(rèn)false僅最后輸出 true各時間步輸出 )

LSTM實現(xiàn)股票預(yù)測

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dropout, Dense, LSTM import matplotlib.pyplot as plt import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error import mathmaotai = pd.read_csv('./SH600519.csv') # 讀取股票文件training_set = maotai.iloc[0:2426 - 300, 2:3].values # 前(2426-300=2126)天的開盤價作為訓(xùn)練集,表格從0開始計數(shù),2:3 是提取[2:3)列,前閉后開,故提取出C列開盤價 test_set = maotai.iloc[2426 - 300:, 2:3].values # 后300天的開盤價作為測試集# 歸一化 sc = MinMaxScaler(feature_range=(0, 1)) # 定義歸一化:歸一化到(0,1)之間 training_set_scaled = sc.fit_transform(training_set) # 求得訓(xùn)練集的最大值,最小值這些訓(xùn)練集固有的屬性,并在訓(xùn)練集上進(jìn)行歸一化 test_set = sc.transform(test_set) # 利用訓(xùn)練集的屬性對測試集進(jìn)行歸一化x_train = [] y_train = []x_test = [] y_test = []# 測試集:csv表格中前2426-300=2126天數(shù)據(jù) # 利用for循環(huán),遍歷整個訓(xùn)練集,提取訓(xùn)練集中連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標(biāo)簽,for循環(huán)共構(gòu)建2426-300-60=2066組數(shù)據(jù)。 for i in range(60, len(training_set_scaled)):x_train.append(training_set_scaled[i - 60:i, 0])y_train.append(training_set_scaled[i, 0]) # 對訓(xùn)練集進(jìn)行打亂 np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7) # 將訓(xùn)練集由list格式變?yōu)閍rray格式 x_train, y_train = np.array(x_train), np.array(y_train)# 使x_train符合RNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。 # 此處整個數(shù)據(jù)集送入,送入樣本數(shù)為x_train.shape[0]即2066組數(shù)據(jù);輸入60個開盤價,預(yù)測出第61天的開盤價,循環(huán)核時間展開步數(shù)為60; 每個時間步送入的特征是某一天的開盤價,只有1個數(shù)據(jù),故每個時間步輸入特征個數(shù)為1 x_train = np.reshape(x_train, (x_train.shape[0], 60, 1)) # 測試集:csv表格中后300天數(shù)據(jù) # 利用for循環(huán),遍歷整個測試集,提取測試集中連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標(biāo)簽,for循環(huán)共構(gòu)建300-60=240組數(shù)據(jù)。 for i in range(60, len(test_set)):x_test.append(test_set[i - 60:i, 0])y_test.append(test_set[i, 0]) # 測試集變array并reshape為符合RNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)] x_test, y_test = np.array(x_test), np.array(y_test) x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))model = tf.keras.Sequential([LSTM(80, return_sequences=True),Dropout(0.2),LSTM(100),Dropout(0.2),Dense(1) ])model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss='mean_squared_error') # 損失函數(shù)用均方誤差 # 該應(yīng)用只觀測loss數(shù)值,不觀測準(zhǔn)確率,所以刪去metrics選項,一會在每個epoch迭代顯示時只顯示loss值checkpoint_save_path = "./checkpoint/LSTM_stock.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='val_loss')history = model.fit(x_train, y_train, batch_size=64, epochs=50, validation_data=(x_test, y_test), validation_freq=1,callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()loss = history.history['loss'] val_loss = history.history['val_loss']plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()################## predict ###################### # 測試集輸入模型進(jìn)行預(yù)測 predicted_stock_price = model.predict(x_test) # 對預(yù)測數(shù)據(jù)還原---從(0,1)反歸一化到原始范圍 predicted_stock_price = sc.inverse_transform(predicted_stock_price) # 對真實數(shù)據(jù)還原---從(0,1)反歸一化到原始范圍 real_stock_price = sc.inverse_transform(test_set[60:]) # 畫出真實數(shù)據(jù)和預(yù)測數(shù)據(jù)的對比曲線 plt.plot(real_stock_price, color='red', label='MaoTai Stock Price') plt.plot(predicted_stock_price, color='blue', label='Predicted MaoTai Stock Price') plt.title('MaoTai Stock Price Prediction') plt.xlabel('Time') plt.ylabel('MaoTai Stock Price') plt.legend() plt.show()##########evaluate############## # calculate MSE 均方誤差 ---> E[(預(yù)測值-真實值)^2] (預(yù)測值減真實值求平方后求均值) mse = mean_squared_error(predicted_stock_price, real_stock_price) # calculate RMSE 均方根誤差--->sqrt[MSE] (對均方誤差開方) rmse = math.sqrt(mean_squared_error(predicted_stock_price, real_stock_price)) # calculate MAE 平均絕對誤差----->E[|預(yù)測值-真實值|](預(yù)測值減真實值求絕對值后求均值) mae = mean_absolute_error(predicted_stock_price, real_stock_price) print('均方誤差: %.6f' % mse) print('均方根誤差: %.6f' % rmse) print('平均絕對誤差: %.6f' % mae)

GRU網(wǎng)絡(luò)

使記憶體ht融合了長期記憶和短期記憶

TensorFlow描述GRU層

tf.keras.layers.GRU(記憶體個數(shù), return_sequences=True or False #是否返回輸出默認(rèn)false僅最后輸出 true各時間步輸出 )

LSTM實現(xiàn)股票預(yù)測

import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dropout, Dense, GRU import matplotlib.pyplot as plt import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error import mathmaotai = pd.read_csv('./SH600519.csv') # 讀取股票文件training_set = maotai.iloc[0:2426 - 300, 2:3].values # 前(2426-300=2126)天的開盤價作為訓(xùn)練集,表格從0開始計數(shù),2:3 是提取[2:3)列,前閉后開,故提取出C列開盤價 test_set = maotai.iloc[2426 - 300:, 2:3].values # 后300天的開盤價作為測試集# 歸一化 sc = MinMaxScaler(feature_range=(0, 1)) # 定義歸一化:歸一化到(0,1)之間 training_set_scaled = sc.fit_transform(training_set) # 求得訓(xùn)練集的最大值,最小值這些訓(xùn)練集固有的屬性,并在訓(xùn)練集上進(jìn)行歸一化 test_set = sc.transform(test_set) # 利用訓(xùn)練集的屬性對測試集進(jìn)行歸一化x_train = [] y_train = []x_test = [] y_test = []# 測試集:csv表格中前2426-300=2126天數(shù)據(jù) # 利用for循環(huán),遍歷整個訓(xùn)練集,提取訓(xùn)練集中連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標(biāo)簽,for循環(huán)共構(gòu)建2426-300-60=2066組數(shù)據(jù)。 for i in range(60, len(training_set_scaled)):x_train.append(training_set_scaled[i - 60:i, 0])y_train.append(training_set_scaled[i, 0]) # 對訓(xùn)練集進(jìn)行打亂 np.random.seed(7) np.random.shuffle(x_train) np.random.seed(7) np.random.shuffle(y_train) tf.random.set_seed(7) # 將訓(xùn)練集由list格式變?yōu)閍rray格式 x_train, y_train = np.array(x_train), np.array(y_train)# 使x_train符合RNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)]。 # 此處整個數(shù)據(jù)集送入,送入樣本數(shù)為x_train.shape[0]即2066組數(shù)據(jù);輸入60個開盤價,預(yù)測出第61天的開盤價,循環(huán)核時間展開步數(shù)為60; 每個時間步送入的特征是某一天的開盤價,只有1個數(shù)據(jù),故每個時間步輸入特征個數(shù)為1 x_train = np.reshape(x_train, (x_train.shape[0], 60, 1)) # 測試集:csv表格中后300天數(shù)據(jù) # 利用for循環(huán),遍歷整個測試集,提取測試集中連續(xù)60天的開盤價作為輸入特征x_train,第61天的數(shù)據(jù)作為標(biāo)簽,for循環(huán)共構(gòu)建300-60=240組數(shù)據(jù)。 for i in range(60, len(test_set)):x_test.append(test_set[i - 60:i, 0])y_test.append(test_set[i, 0]) # 測試集變array并reshape為符合RNN輸入要求:[送入樣本數(shù), 循環(huán)核時間展開步數(shù), 每個時間步輸入特征個數(shù)] x_test, y_test = np.array(x_test), np.array(y_test) x_test = np.reshape(x_test, (x_test.shape[0], 60, 1))model = tf.keras.Sequential([GRU(80, return_sequences=True),Dropout(0.2),GRU(100),Dropout(0.2),Dense(1) ])model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss='mean_squared_error') # 損失函數(shù)用均方誤差 # 該應(yīng)用只觀測loss數(shù)值,不觀測準(zhǔn)確率,所以刪去metrics選項,一會在每個epoch迭代顯示時只顯示loss值checkpoint_save_path = "./checkpoint/stock.ckpt"if os.path.exists(checkpoint_save_path + '.index'):print('-------------load the model-----------------')model.load_weights(checkpoint_save_path)cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path,save_weights_only=True,save_best_only=True,monitor='val_loss')history = model.fit(x_train, y_train, batch_size=64, epochs=50, validation_data=(x_test, y_test), validation_freq=1,callbacks=[cp_callback])model.summary()file = open('./weights.txt', 'w') # 參數(shù)提取 for v in model.trainable_variables:file.write(str(v.name) + '\n')file.write(str(v.shape) + '\n')file.write(str(v.numpy()) + '\n') file.close()loss = history.history['loss'] val_loss = history.history['val_loss']plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()################## predict ###################### # 測試集輸入模型進(jìn)行預(yù)測 predicted_stock_price = model.predict(x_test) # 對預(yù)測數(shù)據(jù)還原---從(0,1)反歸一化到原始范圍 predicted_stock_price = sc.inverse_transform(predicted_stock_price) # 對真實數(shù)據(jù)還原---從(0,1)反歸一化到原始范圍 real_stock_price = sc.inverse_transform(test_set[60:]) # 畫出真實數(shù)據(jù)和預(yù)測數(shù)據(jù)的對比曲線 plt.plot(real_stock_price, color='red', label='MaoTai Stock Price') plt.plot(predicted_stock_price, color='blue', label='Predicted MaoTai Stock Price') plt.title('MaoTai Stock Price Prediction') plt.xlabel('Time') plt.ylabel('MaoTai Stock Price') plt.legend() plt.show()##########evaluate############## # calculate MSE 均方誤差 ---> E[(預(yù)測值-真實值)^2] (預(yù)測值減真實值求平方后求均值) mse = mean_squared_error(predicted_stock_price, real_stock_price) # calculate RMSE 均方根誤差--->sqrt[MSE] (對均方誤差開方) rmse = math.sqrt(mean_squared_error(predicted_stock_price, real_stock_price)) # calculate MAE 平均絕對誤差----->E[|預(yù)測值-真實值|](預(yù)測值減真實值求絕對值后求均值) mae = mean_absolute_error(predicted_stock_price, real_stock_price) print('均方誤差: %.6f' % mse) print('均方根誤差: %.6f' % rmse) print('平均絕對誤差: %.6f' % mae)

總結(jié)

以上是生活随笔為你收集整理的循环神经网络RNN、LSTM、GRU实现股票预测的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。