【深度学习】【python】RNN实现用于评论文本学习 中文注释版
【深度学习】【python】RNN实现用于评论文本学习 中文注释版环境要求python3.5tensorflow 1.4pytorch 0.2.0本程序只需要tensorflow.程序如下:读取预处理RNN模型使用的训练数据:reddit-comments数据集程序:#!/usr/bin/env python# -*- coding: utf-8 -*-...
·
【深度学习】【python】RNN实现用于评论文本学习 中文注释版
环境要求
- python3.5
- tensorflow 1.4
- pytorch 0.2.0
本程序只需要tensorflow.
程序如下:
读取预处理RNN模型使用的训练数据:reddit-comments数据集程序:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""读取预处理RNN模型使用的训练数据:reddit-comments数据集"""
import csv
import itertools
import operator
import numpy as np
import nltk
import sys
from datetime import datetime
def get_data(fileName='/data/reddit-comments-2015-08.csv', vocabulary_size = 8000, unknown_token = "UNKNOWN_TOKEN",
sentence_start_token="SENTENCE_START", sentence_end_token = "SENTENCE_END"):
"""读取数据并在句子添加SENTENCE_START和SENTENCE_END标志"""
# 开始读取csv文件;
print("Reading CSV file...")
# 打开文件;
with open(sys.path[0]+fileName, 'r', encoding='utf-8') as f:
# 创建csv读取句柄;
reader = csv.reader(f, skipinitialspace=True)
# 迭代;
reader.__next__()
# 将评论数据切割为多个句子,sentences是一个列表;
sentences = itertools.chain(*[nltk.sent_tokenize(x[0].lower()) for x in reader])
# 添加SENTENCE_START和SENTENCE_END标志在每句话开头/结尾;
sentences = ["%s %s %s" % (sentence_start_token, x, sentence_end_token) for x in sentences]
# 输出句子数量;
print("Parsed %d sentences." % (len(sentences)))
# 对句子进行分词;tokenized_sentences列表里每一个元素是分好词的句子;
tokenized_sentences = [nltk.word_tokenize(sent) for sent in sentences]
# 进行词频统计;
word_freq = nltk.FreqDist(itertools.chain(*tokenized_sentences))
# 输出统计结果;
print("Found %d unique words tokens." % len(word_freq.items()))
# 获取高频词,建立index_to_word映射,和word_to_index向量;
# 按词频排序;
vocab = word_freq.most_common(vocabulary_size - 1)
# 建立index_to_word映射;index_to_word[i]就是标号i的词;
index_to_word = [x[0] for x in vocab]
# 最后一个元素是没出现过的词;
index_to_word.append(unknown_token)
# word_to_index是{word:index}的字典;
word_to_index = dict([(w, i) for i, w in enumerate(index_to_word)])
# 输出词袋大小;
print("Using vocabulary size %d." % vocabulary_size)
# 最低频词;
print("The least frequent word in our vocabulary is '%s' and appeared %d times." % (vocab[-1][0], vocab[-1][1]))
# 将没出现在词袋里的词替换为unknown_token;
# 迭代所有分词;
for i, sent in enumerate(tokenized_sentences):
# 没出现在词袋里的词替换为unknown_token;
tokenized_sentences[i] = [w if w in word_to_index else unknown_token for w in sent]
# 输出第一个句子;
print("\nExample sentence: '%s'" % sentences[0])
# 输出预处理后第一个分词;
print("\nExample sentence after Pre-processing: '%s'" % tokenized_sentences[0])
# 构造训练数据集;
X_train = []
y_train = []
# 迭代分词;
for sen in tokenized_sentences:
# X_train是:[[11,1],[23,17,...],...]这样的index列表范围是第一个词到倒数第二个;
X_train.append(list([word_to_index[w] for w in sen[:-1]]))
# y_train是:[[1,12],[17,27,...],...]这样的index列表范围是第二个词到最后一个;
y_train.append(list([word_to_index[w] for w in sen[1:]]))
# 和上述构造方式相同只是格式是np.array;
X_train = np.asarray([[word_to_index[w] for w in sent[:-1]] for sent in tokenized_sentences])
y_train = np.asarray([[word_to_index[w] for w in sent[1:]] for sent in tokenized_sentences])
# 抽样;
x_example, y_example = X_train[17], y_train[17]
# 输出;
print("x:\n%s\n%s" % (" ".join([index_to_word[x] for x in x_example]), x_example))
print("\ny:\n%s\n%s" % (" ".join([index_to_word[x] for x in y_example]), y_example))
# 返回结果;
return (X_train, y_train)
使用Tensorflow实现RNN model:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""使用Tensorflow实现RNN model"""
import timeit
from datetime import datetime
import numpy as np
import tensorflow as tf
from input_data_rnn import get_data
class RNN_tf(object):
"""用于语言模型的rnn类"""
def __init__(self, inpt=None, word_dim=8000, hidden_dim=100, bptt_truncate=4):
"""
-----------变量说明-----------------
: inpt: tf.Tensor, 输入的张量;
: word_dim: int, 输入句子的单词数;
: hidden_dim: int, 隐含层的单元数;
: bptt_truncate: int, 基于时间的反向传播算法BPTT的参数;
"""
# 参数初始化;
self.word_dim = word_dim
self.hidden_dim = hidden_dim
self.bptt_truncate = bptt_truncate
if inpt is None:
inpt = tf.placeholder(tf.int32, shape=[None, ])
self.x = inpt
self.y = tf.placeholder(tf.int32, shape=[None, ])
# 初始化网络参数;
bounds = np.sqrt(1.0/self.word_dim)
# 输入的权重矩阵U;
self.U = tf.Variable(tf.random_uniform([self.word_dim, self.hidden_dim], minval=-bounds, maxval=bounds),
name="U")
# 初始化网络参数;
bounds = np.sqrt(1.0/self.hidden_dim)
# 上一步的输出的输入权重矩阵;
self.W = tf.Variable(tf.random_uniform([self.hidden_dim, self.hidden_dim], minval=-bounds, maxval=bounds),
name="W")
# 这一轮输出的权重矩阵;
self.V = tf.Variable(tf.random_uniform([self.hidden_dim, self.word_dim], minval=-bounds, maxval=bounds),
name="V")
# 更新参数使用params统一管理;
self.params = [self.U, self.W, self.V]
# 模型构造;
self.__model_build__()
def __model_build__(self):
"""构造RNN model"""
# 内部函数:前向传播;
def forward_propagation(s_t_prv, x_t):
# 计算 S_t = tanh( U*X_t + W*S_t-1 );
s_t = tf.nn.tanh(tf.slice(self.U, [x_t, 0], [1, -1]) + tf.matmul(s_t_prv, self.W))
# 返回结果;
return s_t
# 使用scan函数获取所有时间点的隐含层状态;
# 输出维度seq_len, 1, hidden_dim];
s = tf.scan(forward_propagation, self.x, initializer=tf.zeros([1, self.hidden_dim]))
# 挤压函数,输出维度[seq_len, hidden_dim];
s = tf.squeeze(s)
# 输出 O = S*V;
o_wx = tf.matmul(s, self.V)
# 输出 O = delta(O);
o = tf.nn.softmax(o_wx)
# 输出预测;
self.prediction = tf.argmax(o, axis=1)
# 训练的代价计算(交叉熵损失);
self.cost = tf.reduce_sum(tf.nn.sparse_softmax_cross_entropy_with_logits(o_wx, self.y))
# 平均损失;
self.loss = self.cost / tf.cast(tf.size(self.x), tf.float32)
def train_rnn_with_sgd(sess, model, X_train, y_train, learning_rate=0.005, n_epochs=100,
evaluate_loss_after=5):
"""使用随机梯度下降训练rnn"""
# 声明训练句柄;
train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(model.cost, var_list=model.params)
# 训练样本数;
N = len(X_train)
# 输出"Start training...";
print("Start training...")
# 计时开始标志;
start_time = timeit.default_timer()
# 多个迭代训练步;
for epoch in range(n_epochs):
# epoch整除evaluate_loss_after,即刚好一个度量loss的周期时;
if epoch % evaluate_loss_after == 0:
# 声明loss;
losses = 0
# 迭代训练样本数;
for i in range(N):
# 累计loss;
losses += sess.run(model.loss, feed_dict={model.x: X_train[i], model.y: y_train[i]})
# 计算一个周期的结束时间;
time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 输出时间情况;
print("\t{0}:Loss after Epoch {1} is {2}".format(time, epoch, losses/N))
# 一个样本一个样本地训练;
for i in range(N):
# 执行train_op;
sess.run(train_op, feed_dict={model.x: X_train[i], model.y: y_train[i]})
# 结束时间;
end_time = timeit.default_timer()
# 输出耗时;
print("Finished!")
print("Time elapsed {0} minutes.".format((end_time-start_time)/60.0))
if __name__ == "__main__":
# 随机数种子;
np.random.seed(10)
tf.set_random_seed(1111)
# 词袋大小;
vocabulary_size = 8000
# 获取训练数据:
# X_train是:[[11,1],[23,17,...],...]这样的index列表范围是第一个词到倒数第二个;
# y_train是:[[1,12],[17,27,...],...]这样的index列表范围是第二个词到最后一个;
X_train, y_train = get_data(vocabulary_size=vocabulary_size)
# 执行;
with tf.Session() as sess:
# 声明模型;
model = RNN_tf(inpt=None, word_dim=8000, hidden_dim=100)
# 参数初始化;
sess.run(tf.global_variables_initializer())
# 使用随机梯度下降训练rnn;
train_rnn_with_sgd(sess, model, X_train[:1000], y_train[:1000], n_epochs=10, evaluate_loss_after=1)
更多推荐
已为社区贡献14条内容
所有评论(0)