LSTM網路和程式碼示例
最近在看RNN相關的東西,這篇文章就當自己學習的筆記了吧。
RNN的背景
在深度學習大發展的背景下,神經網路在很多工中都取得了不俗的效果。於是就有人想了能不能用這些又大又深的網路讓電腦聽懂人們在講什麼,但是這些神經網路卻讓他們失望了。原因其實很簡單,一般的前饋神經網路和卷積神經網路並沒有辦法理解或者學習語言或者時間序列中的上下文邏輯。對於他們,資料中的特徵是相對獨立的。在這樣的背景下,迴圈神經網路(Recurrent Neural Network, 簡稱RNN)這類可以學習上下文邏輯的網路也就走入學界的視線。
RNN能起到如此神奇的效果的原因在於它能夠依據資料的時序關係保留特徵。來一張經典圖,就長這個模樣,這個神經元做的事就是不斷地把算出來的東西加個權重又傳回去。
因為這個特性,深度迴圈神經網路被廣泛應用於時序分析,語言模型和音訊分析,例如文字翻譯、影片分析和金融領域的一些任務。這些任務都十分依賴資料在時間上的關係。當然RNN有著一些致命的缺點。最有代表性的就是梯度爆炸和梯度彌散的問題,這直接導致了傳統的RNN網路十分難以訓練。其中的原因在於W在不斷的迴圈中,一些引數被不斷的傳遞,結果在不斷地乘法下以指數級的速度增長或者下降,就好像一個貪心的胖子不斷地吃吃吃,結果把自己吃出了一身毛病。還有的是長時期的依賴關係(“long-term dependencies”)的問題。導致這個的原因在於當依賴資訊與當前學習的位置間隔較遠的時候,RNN很難去把他們關聯起來。所以為了解決這些問題,也就有了本文的主體LSTM和GRU,以及各種各樣不同的RNN變種。
LSTM和GRU
LSTM,長短時記憶神經網路
LSTM,即Long Short Term Memory Network,中文叫做長短時記憶神經網路。LSTM所解決的問題是長時期的依賴關係。他的網路結構見圖:
這個圖看上去很複雜,但其實描述的東西並不多,包括了每個LSTM單元的輸入輸出,遺忘門,傳入門和輸出門。其中的黑色線條是資料的走向,各個小塊對應了各種計算。對於一個LSTM層,只需要拆解這個圖,就可以解決了。在接下來的拆解中將涉及到幾個重要的變數:
:t 時刻的輸入
: t 時刻的輸出
: t 時刻的LSTM層狀態
:a 位置的引數
:a 位置的偏置項
首先是遺忘門,對應下圖:
這個函式的計算是透過一個sigmoid函式來對輸入
和上一個迴圈的輸出
的資訊進行提取,使其投影到一個0至1的區間中,然後對上一個迴圈的狀態
進行資訊的過濾。
生成了一個維度和
一致的向量,其中的值就代表了過濾中所保留和去除的比例的。在語言翻譯中舉個例子。當一個小段結束,翻譯下一個小段的時候,有些東西就不需要了,比方說為了活躍氣氛的段子,而有一些東西就一直需要,比方說特別好笑的段子……emmm,錯了,是整個翻譯的主題或者涉及到的一些重要名詞。那麼遺忘門所做的就是決定要留下什麼,要去掉什麼。
然後就是傳入門,傳入門的作用就是把t時刻的輸入轉化成t時刻的狀態。這個步驟對應兩步,第一步見下圖:
這個步驟有兩部分。第一部分
是不是很熟悉,把當中的i換成f就是遺忘門的算式。這時候你一定要說了,哇,這是不是偷懶啊,就像這個作者在湊字數一樣!那我要說了,並不是!
算式中的
,
和
中的
和
的是兩組不同的數。所以他代表的作用是使用和遺忘門同樣的思路對
和
計算出的狀態值進行篩選。 這一步的意義在於對於同一組資料,可能會有不同的意義,比方說“吃了嗎?”可以只是個問候,也有可能是真的在問這個問題,而這個引數決定的就是這個“吃了嗎“可能是什麼意思。第二部分
則是利用一個tanh函式以及引數
和
計算出當前時刻的資料對於狀態
的影響。他的作用就是計算出當前這組資料對於這個單元的狀態變化的貢獻。利用前面的例子,這個就是計算“吃了嗎”有哪幾個意思。
第二個步驟就是結合遺忘門的輸出組合出
。
這一步驟就是利用之前算出的各個變數組合出當前的單元的狀態。
最後,當前狀態和各種輸入準備完畢,就可以透過輸出門把這個單元的結果輸出了。見下圖:
有了之前的經驗,到了這一步就很簡單了。和、的作用是一樣,他負責對於最後的輸出進行篩選,然後利用之前算好的和一個tanh函式一起組合出最後的輸出。
2。GRU,Gated Recurrent Unit
LSTM有著非常多的變種,其中最著名的變種就是GRU。GRU相比LSTM做了很多簡化的東西。具體的結構見下圖:
GRU只有重置門和更新門,對應了LSTM的遺忘門和傳入門。同時在每個單元的狀態上,GRU取消了
,取而代之的是將所有代表單元狀態的功能放在了
。
的作用和
相似,只不過他的作用範圍在
上。
則和
相類似,表達了在當前輸入和上一個單元的輸出對於當前輸出的影響。
可以從他在
中的作用看出他的作用。
表達了在當前的
和
下應該保留多少
和
的特徵,起到了
和
的作用。
例子
那麼到了例子時間,這裡使用的資料可以從這個網址下載:
https://
archive。ics。uci。edu/ml/
datasets/YouTube+Spam+Collection
在這個資料集下,我使用了一個很簡單的單層雙向lstm+兩個全連線層來訓練模型,在非常短的時間內達到了85%的準確率,作為一個demo已經不錯了,有興趣的同學可以試一下,新增幾曾網路,或者做一些文字處理,來提升準確率。
首先,是對於train集和val集、test集進行劃分,程式碼如下:
import pandas as pd
import glob
import os
import random
import json
if __name__==‘__main__’:
input_dir = ‘。。/data’
output_dir = ‘。。/data_for_train’
if not os。path。exists(output_dir):
os。mkdir(output_dir)
file_list = glob。glob(input_dir+‘/Youtube01-Psy。csv’)
all_data = []
all_label = []
for file in file_list:
data= pd。read_csv(file)
data = data。values
print(data[:,4])
all_data。extend(data[:,3])
all_label。extend(data[:,4])
pass
data = [ [d,all_label[idx]] for idx,d in enumerate(all_data)]
random。shuffle(data)
train_data = data[:int(0。8*len(data))]
val_data = data[int(0。8*len(data)):int(0。9*len(data))]
test_data = data[int(0。9*len(data)):]
json。dump(train_data,open(os。path。join(output_dir,‘train。json’),‘w’))
json。dump(val_data,open(os。path。join(output_dir,‘val。json’),‘w’))
json。dump(test_data,open(os。path。join(output_dir,‘test。json’),‘w’))
然後由於這是一個文字分類任務,所以接下來的第一件事是對於文字進行處理,其中包括但不限於tokenize,stem 和去除stopwords。 程式碼如下:
import json
import numpy as np
import nltk
import os
from nltk。corpus import stopwords
# download nltk data if never use the library
# nltk。download()
if __name__==‘__main__’:
input_file = ‘。。/data_for_train/train。json’
output_dir = ‘。。/data_for_train’
if not os。path。exists(output_dir):
os。mkdir(output_dir)
data = json。load(open(input_file,‘r’))
stemmer = nltk。stem。PorterStemmer()
word_list = []
max_len = 0
for d,l in data:
#tokenize text data
tokens = nltk。tokenize。word_tokenize(d)
#stem the token
tokens = [stemmer。stem(token) for token in tokens]
#drop the meaningless stopwords
tokens = [token for token in tokens if token not in stopwords。words(‘english’)]
max_len = max(max_len,len(tokens))
word_list。extend(tokens)
word_list = list(set(word_list))
json。dump(word_list,open(os。path。join(output_dir,‘word_list。json’),‘w’))
print max_len
pass
然後製作一個給TensorFlow用的data loader來進行訓練,程式碼如下:
import json
import nltk
class Data_loader():
def __init__(self,input_dir,word_list_file,max_len = 700):
self。pos= 0
self。max_len = max_len
self。data_list = json。load(open(input_dir,‘r’))
self。stemmer = nltk。stem。PorterStemmer()
self。word_list = tuple(json。load(open(word_list_file,‘r’)))
self。rebuild_data_list =[]
self。rebuild_data_mask =[]
self。rebuild_data_label =[]
#transform the text to max_len dimension vector and the mask for vector
for d,l in self。data_list:
data,mask,label=self。_rebuild_data(d,l)
self。rebuild_data_list。append(data)
self。rebuild_data_mask。append(mask)
self。rebuild_data_label。append(label)
def _rebuild_data(self,text,l):
tokens = nltk。tokenize。word_tokenize(text)
tokens = [self。stemmer。stem(token) for token in tokens]
tokens = [self。word_list。index(token) for token in tokens if token in self。word_list]
mask = len(tokens)
if len(tokens) >= self。max_len:
tokens = tokens[:self。max_len]
else:
tokens。extend([0] * (self。max_len-len(tokens)))
tmp =[0,0]
tmp[l] = 1
return tokens ,mask ,tmp
def get_data(self,batch_size):
tims = len(self。data_list)/batch_size+1
pos =0
#generate one epoch batch data
for _ in range(tims):
start = pos
if start+batch_size >len(self。data_list):
yield self。rebuild_data_list[start:],self。rebuild_data_mask[start:],self。rebuild_data_label[start:]
pos = len(self。data_list)
else:
end = start+batch_size
yield self。rebuild_data_list[start:end],self。rebuild_data_mask[start:end],self。rebuild_data_label[start:end]
pos = end
接下里就是對於訓練模型需要的引數進行設定了:
import tensorflow as tf
import os
flags = tf。app。flags
FLAGS = flags。FLAGS
flags。DEFINE_integer(‘batch_size’,64,‘the batch_size of the training procedure’)
flags。DEFINE_float(‘lr’,0。01,‘the learning rate’)
flags。DEFINE_float(‘lr_decay’,0。89,‘the learning rate decay’)
flags。DEFINE_integer(‘vocabulary_size’,4000,‘vocabulary_size’)
flags。DEFINE_integer(‘emdedding_dim’,128,‘embedding dim’)
flags。DEFINE_integer(‘hidden_neural_size’,128,‘LSTM hidden neural size’)
flags。DEFINE_integer(‘hidden_layer_num’,1,‘LSTM hidden layer num’)
flags。DEFINE_string(‘dataset_path’,‘word_data。pkl’,‘dataset path’)
flags。DEFINE_integer(‘max_len’,200,‘max_len of training sentence’)
flags。DEFINE_integer(‘valid_num’,64,‘epoch num of validation’)
flags。DEFINE_integer(‘checkpoint_num’,15,‘epoch num of checkpoint’)
flags。DEFINE_float(‘init_scale’,0。1,‘init scale’)
flags。DEFINE_integer(‘class_num’,2,‘class num’)
flags。DEFINE_float(‘keep_prob’,0。5,‘dropout rate’)
flags。DEFINE_integer(‘num_iter’,5000,‘num epoch’)
flags。DEFINE_integer(‘max_decay_epoch’,50,‘num epoch’)
flags。DEFINE_integer(‘max_grad_norm’,5,‘max_grad_norm’)
flags。DEFINE_string(‘out_dir’,os。path。abspath(os。path。join(os。path。curdir,“model”)),‘output directory’)
flags。DEFINE_integer(‘check_point_every’,15,‘checkpoint every num epoch ’)
flags。DEFINE_integer(‘max_vector_len’,200,‘max vector len ’)
class Config(object):
hidden_neural_size = FLAGS。hidden_neural_size
vocabulary_size = FLAGS。vocabulary_size
embed_dim = FLAGS。emdedding_dim
hidden_layer_num = FLAGS。hidden_layer_num
class_num = FLAGS。class_num
keep_prob = FLAGS。keep_prob
lr = FLAGS。lr
lr_decay = FLAGS。lr_decay
batch_size = FLAGS。batch_size
num_step = FLAGS。max_len
max_grad_norm = FLAGS。max_grad_norm
num_iter = FLAGS。num_iter
max_decay_epoch = FLAGS。max_decay_epoch
valid_num = FLAGS。valid_num
out_dir = FLAGS。out_dir
checkpoint_every = FLAGS。check_point_every
max_vector_len = FLAGS。max_vector_len
init_scale = FLAGS。init_scale
接下來是建模,這裡使用了一個雙向lstm和兩個全連線層構建網路,然後使用SGD對模型進行最佳化。
import tensorflow as tf
import numpy as np
class RNN_Model(object):
def __init__(self,config,is_training=0):
#set the parmeters for training
self。keep_prob = tf。placeholder(tf。float32, shape=[], name=“keep_prob”)
self。batch_size = tf。Variable(config。batch_size, dtype=tf。int32, trainable=False)
num_step = config。num_step
class_num = config。class_num
self。input_data = tf。placeholder(tf。int32, [None, num_step])
self。target = tf。placeholder(tf。int64, [None, class_num])
self。mask = tf。placeholder(tf。int32, [None])
hidden_neural_size = config。hidden_neural_size
vocabulary_size = config。vocabulary_size
embed_dim = config。embed_dim
hidden_layer_num = config。hidden_layer_num
self。new_batch_size = tf。placeholder(tf。int32, shape=[], name=“new_batch_size”)
self。_batch_size_update = tf。assign(self。batch_size, self。new_batch_size)
self。pad = tf。placeholder(tf。float32, [None, 1, embed_dim, 1], name=‘pad’)
l2_loss = tf。constant(0。01)
#generate word embedding for text vector
with tf。device(“/cpu:0”), tf。name_scope(“embedding_layer”):
embedding = tf。get_variable(“embedding”, [vocabulary_size, embed_dim],
dtype=tf。float32)
inputs = tf。nn。embedding_lookup(embedding, self。input_data)
# lstm for bidirectional lstm
lstm_cell_fw = tf。nn。rnn_cell。BasicLSTMCell(hidden_neural_size, forget_bias=1。0)
lstm_cell_bw = tf。nn。rnn_cell。BasicLSTMCell(hidden_neural_size, forget_bias=1。0)
cell_fw = tf。nn。rnn_cell。MultiRNNCell([lstm_cell_fw] * hidden_layer_num, state_is_tuple=True)
cell_bw = tf。nn。rnn_cell。MultiRNNCell([lstm_cell_bw] * hidden_layer_num, state_is_tuple=True)
out_put, _ = tf。nn。bidirectional_dynamic_rnn(cell_fw=cell_fw,
cell_bw=cell_bw,
inputs=inputs,
dtype=tf。float32,
sequence_length=self。mask)
out_put = tf。concat([out_put[0],out_put[1]],2)
#sum pooling
out_put = tf。reduce_sum(out_put,1)
dense_dim = 1024
#dense layer for predict
with tf。name_scope(“dense1”):
dense_w1 = tf。get_variable(“dense_w1”, [hidden_neural_size*2, dense_dim], dtype=tf。float32)
dense_b1 = tf。get_variable(“dense_b1”, [dense_dim], dtype=tf。float32)
out_put = tf。matmul(out_put, dense_w1) + dense_b1
out_put = tf。nn。tanh(out_put)
out_put = tf。nn。dropout(out_put, self。keep_prob)
#predict score
with tf。name_scope(“Softmax_layer_and_output”):
softmax_w = tf。get_variable(“softmax_w”, [dense_dim, class_num], dtype=tf。float32)
softmax_b = tf。get_variable(“softmax_b”, [class_num], dtype=tf。float32)
l2_loss += tf。nn。l2_loss(softmax_w)
l2_loss += tf。nn。l2_loss(softmax_b)
logits = tf。matmul(out_put, softmax_w) + softmax_b
self。logits = tf。nn。tanh(logits )
#probability
self。softmax = tf。nn。softmax(self。logits)
# softmax loss
with tf。name_scope(“loss”):
self。loss = tf。nn。softmax_cross_entropy_with_logits(logits=self。logits,
labels=self。target)
self。cost = tf。reduce_mean(self。loss) + 0。01 * l2_loss
#predict and get the accuracy
with tf。name_scope(“accuracy”):
self。prediction = tf。argmax(self。softmax, axis=1)
self。confidence = tf。reduce_max(self。softmax, axis=1)
target = tf。argmax(self。target, axis=1)
correct_prediction = tf。equal(self。prediction, target)
self。correct_num = tf。reduce_sum(tf。cast(correct_prediction, tf。float32))
self。accuracy = tf。reduce_mean(tf。cast(correct_prediction, tf。float32), name=“accuracy”)
# add summary
loss_summary = tf。summary。scalar(“loss”, self。cost)
# add summary
accuracy_summary = tf。summary。scalar(“accuracy_summary”, self。accuracy)
self。out = self。softmax
if is_training:
return
self。globle_step = tf。Variable(0, name=“globle_step”, trainable=False)
self。lr = tf。Variable(0。0, trainable=False)
#add summary
self。summary = tf。summary。merge([loss_summary, accuracy_summary])
#SGD optimizer
self。optimizer = tf。train。GradientDescentOptimizer(self。lr)。minimize(self。cost)
self。new_lr = tf。placeholder(tf。float32, shape=[], name=“new_learning_rate”)
self。_lr_update = tf。assign(self。lr, self。new_lr)
#update learning rate
def assign_new_lr(self,session,lr_value):
session。run(self。_lr_update,feed_dict={self。new_lr:lr_value})
# update data batch size
def assign_new_batch_size(self,session,batch_size_value):
session。run(self。_batch_size_update,feed_dict={self。new_batch_size:batch_size_value})
最後是訓練部分,這裡使用了summary,是的可以利用tensorflow的工具進行檢視。
from data_process import data_loader
import numpy as np
import os
import time
from lstm。rnn_model import RNN_Model
from lstm。tf_config import Config
import tensorflow as tf
def evaluate(sess,val_model, data, global_step=None,val_summary = None):
correct_num = 0
total_num = 0
for step, (x, mask_x, y) in enumerate(data。get_data(batch_size=Config。batch_size)):
fetches = [val_model。correct_num,val_model。prediction]
feed_dict = {}
feed_dict[val_model。input_data] = x
feed_dict[val_model。target] = y
feed_dict[val_model。mask] = mask_x
feed_dict[val_model。keep_prob] = 1。
val_model。assign_new_batch_size(sess, len(x))
feed_dict[val_model。pad] = np。zeros([Config。batch_size, 1, Config。embed_dim, 1])
count,pre = sess。run(fetches, feed_dict)
total_num += len(x)
correct_num += count
accuracy = float(correct_num)/total_num
dev_summary = tf。summary。scalar(‘dev_accuracy’, accuracy)
dev_summary = sess。run(dev_summary)
if val_summary:
val_summary。add_summary(dev_summary, global_step)
val_summary。flush()
return accuracy
def run_epoch(sess,model,val_model,data,data_val,global_step,train_summary,val_summary):
num = 0
accuracy_t = 0
cost_t = 0
for step, (x,mask,y) in enumerate(data。get_data(batch_size=Config。batch_size)):
feed_dict = {}
feed_dict[model。input_data] = x
feed_dict[model。target] = y
feed_dict[model。mask] = mask
feed_dict[model。keep_prob] = Config。keep_prob
feed_dict[model。pad] = np。zeros([Config。batch_size, 1, Config。embed_dim, 1])
model。assign_new_batch_size(sess, len(x))
fetches = [model。optimizer,model。cost, model。accuracy, model。summary,model。out]
optimizer,cost, accuracy, summary,out = sess。run(fetches, feed_dict)
train_summary。add_summary(summary, global_step)
train_summary。flush()
accuracy_t += len(x)*accuracy
cost_t += len(x)*(cost-cost_t)/(len(x)+num)
num+=len(x)
global_step += 1
#validate for each epoch
valid_accuracy = evaluate(sess, val_model, data_val, global_step, val_summary)
print(“the %i step, train cost is: %f and the train accuracy is %f and the valid accuracy is %f” % (
global_step, cost, accuracy, valid_accuracy))
print(“the %i step, train cost_T is: %f and the train accuracy_T is %f ” % (
global_step, cost_t, accuracy_t/num))
return global_step
def train_step():
print ‘load date’
config = Config()
eval_config = Config()
eval_config。keep_prob = 1。0
loader_train = data_loader。Data_loader(
os。path。join(os。path。dirname(os。path。dirname(__file__)), “data_for_train/train。json”),
os。path。join(os。path。dirname(os。path。dirname(__file__)), “data_for_train/word_list。json”),max_len=config。max_vector_len)
loader_val = data_loader。Data_loader(
os。path。join(os。path。dirname(os。path。dirname(__file__)), “data_for_train/val。json”),
os。path。join(os。path。dirname(os。path。dirname(__file__)), “data_for_train/word_list。json”),max_len=config。max_vector_len)
with tf。Graph()。as_default(),tf。Session() as sess:
initializer = tf。truncated_normal_initializer(stddev=config。init_scale)
with tf。variable_scope(“model”,initializer=initializer):
model = RNN_Model(config=config)
with tf。variable_scope(“model”,reuse=True):
valid_model = RNN_Model(config=eval_config,is_training=1)
train_summary_dir = os。path。join(config。out_dir, “summaries”, “train”)
train_summary_writer = tf。summary。FileWriter(train_summary_dir, sess。graph)
val_summary_dir = os。path。join(eval_config。out_dir, “summaries”, “val”)
val_summary_writer = tf。summary。FileWriter(val_summary_dir, sess。graph)
checkpoint_dir = os。path。abspath(os。path。join(config。out_dir, “checkpoints_v1”))
checkpoint_prefix = os。path。join(checkpoint_dir, “model”)
if not os。path。exists(checkpoint_dir):
os。makedirs(checkpoint_dir)
#prepare for check point
saver = tf。train。Saver(tf。global_variables())
try:
ckpt = tf。train。get_checkpoint_state(os。path。join(config。out_dir, “checkpoints_v1”))
except:
ckpt = None
if ckpt and ckpt。model_checkpoint_path:
print(“Continue training from the model {}”。format(ckpt。model_checkpoint_path))
saver。restore(sess, ckpt。model_checkpoint_path)
else:
tf。initialize_all_variables()。run()
global_steps = 1
begin_time = int(time。time())
for i in range(config。num_iter):
print(“the %d epoch training。。。”%(i+1))
lr_decay = config。lr_decay ** max(i-config。max_decay_epoch,0。0)
model。assign_new_lr(sess,config。lr*lr_decay)
#go training
global_steps=run_epoch(sess, model, valid_model, loader_train,loader_val, global_steps, train_summary_writer, val_summary_writer)
if i% config。checkpoint_every==0:
path = saver。save(sess,checkpoint_prefix,global_steps)
print(“Saved model chechpoint to{}\n”。format(path))
print(“the train is finished”)
end_time=int(time。time())
print(“training takes %d seconds already\n”%(end_time-begin_time))
test_accuracy=evaluate(valid_model, sess, loader_val)
print(“the test data accuracy is %f”%test_accuracy)
print(“program end!”)
def main(_):
train_step()
if __name__ == “__main__”:
tf。app。run()
程式碼可以在
https://
github。com/dante0shy/LS
TM_sample。git
這裡找到。
參考:
https://
archive。ics。uci。edu/ml/
datasets/YouTube+Spam+Collection
http://
colah。github。io/posts/2
015-08-Understanding-LSTMs/