1. 关键词
BPTT
2. 简介
人们在开始思考时,不是每次都从零开始。比如你读这篇文章,因为你曾经看过相似的文字,所以也能理解这里的文字。你不是从头开始学,你的知识是逐渐积累的。
在多层感知器中隐含层之间依次次连接。当把隐含层折叠起来,就可以得到一个递归网络。如下图:
公式表示:
式中:
- 是输入层;
- 是输入层到隐含层权重;
- 是到上一隐含层到隐含层权重;
- 是隐含层;
- 是隐含层到输出层权重;
- 是输出层;
- tanh 和 softmax 是**函数。
RNN 适用于输出有相互联系的问题,比如自然语言处理(NLP),同一句话中,后面的单词依赖于前面的。
3. 损失函数
用交叉熵定义 t 时刻损失函数:
其中: 是真实值, 是预测值。
则总误差:
4. 反向传播
基于链式法则,通过时间的反向传播(BPTT),权重梯度是所有时刻累加:
因为 U 和 W 影响所有过程,以 为例,误差传播过程:
W 的梯度:
V 不需要其他时刻,梯度如下:
当使用 BPTT 算法时,需要设定最大传播时间。以上图为例,当设定为4时,t=5时刻,只需要计算t=1。这也有其物理含义,时间间隔越大,相互的影响越小,如果一直保持,会导致大量计算。
5. 例子
基于已知的语言,随机生成一句话。参考附录1的代码,并修改了其中的错误,汇总为一个可执行脚本。执行过程:
- 从 Google BigQuery 下载数据;
- 使用 NLTK 对数据进行预处理,拆分为句子,句子开头添加
SENTENCE_START
作为输入,句子结尾添加SENTENCE_END
作为输出,以此建立了一句话词的前后关系; - 统计每个词出现的频率,只取前8000个,其他词用
UNKNOWN_TOKEN
替代,对词进行编号,句子中的文字用数字代替; - 创建一个 RNN 类,词汇表维度(输入、输出)=8000,隐含层神经元=100,反向传播的最大时间跨度=4,以此得到权重维度,U=V=100x8000,W=100x100,随机初始化权重;
- 开始迭代;
- 前向传播,保留所有隐含层和输出层值;
- 反向传播,得到权重梯度;
- 求解新的权重,计算损失;
- 迭代到收敛;
- 利用训练后的模型,生成语句。
完整代码如下
import csv as csv
import itertools
import nltk
import numpy as np
import datetime
import sys
def softmax(z):
return np.exp(z) / sum(np.exp(z))
class RNNNumpy:
def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
# Assign instance variables
self.word_dim = word_dim
self.hidden_dim = hidden_dim
self.bptt_truncate = bptt_truncate
# Randomly initialize the network parameters
self.U = np.random.uniform(-np.sqrt(1. / word_dim), np.sqrt(1. / word_dim), (hidden_dim, word_dim))
self.V = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (word_dim, hidden_dim))
self.W = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (hidden_dim, hidden_dim))
def forward_propagation(self, x):
# The total number of time steps
T = len(x)
# During forward propagation we save all hidden states in s because need them later.
# We add one additional element for the initial hidden, which we set to 0
s = np.zeros((T + 1, self.hidden_dim))
s[-1] = np.zeros(self.hidden_dim)
# The outputs at each time step. Again, we save them for later.
o = np.zeros((T, self.word_dim))
# For each time step...
for t in np.arange(T):
# Note that we are indxing U by x[t]. This is the same as multiplying U with a one-hot vector.
s[t] = np.tanh(self.U[:, x[t]] + self.W.dot(s[t - 1]))
o[t] = softmax(self.V.dot(s[t]))
return [o, s]
def predict(self, x):
# Perform forward propagation and return index of the highest score
o, s = self.forward_propagation(x)
return np.argmax(o, axis=1)
def calculate_total_loss(self, x, y):
L = 0
# For each sentence...
for i in np.arange(len(y)):
o, s = self.forward_propagation(x[i])
# We only care about our prediction of the "correct" words
correct_word_predictions = o[np.arange(len(y[i])), y[i]]
# Add to the loss based on how off we were
L += -1 * np.sum(np.log(correct_word_predictions))
return L
def calculate_loss(self, x, y):
# Divide the total loss by the number of training examples
N = np.sum((len(y_i) for y_i in y))
return self.calculate_total_loss(x, y) / N
def bptt(self, x, y):
T = len(y)
# Perform forward propagation
o, s = self.forward_propagation(x)
# We accumulate the gradients in these variables
dLdU = np.zeros(self.U.shape)
dLdV = np.zeros(self.V.shape)
dLdW = np.zeros(self.W.shape)
delta_o = o
delta_o[np.arange(len(y)), y] -= 1.
# For each output backwards...
for t in np.arange(T)[::-1]:
dLdV += np.outer(delta_o[t], s[t].T)
# Initial delta calculation
delta_t = self.V.T.dot(delta_o[t]) * (1 - (s[t] ** 2))
# Backpropagation through time (for at most self.bptt_truncate steps)
for bptt_step in np.arange(max(0, t - self.bptt_truncate), t + 1)[::-1]:
# print ("Backpropagation step t=%d bptt step=%d" % (t, bptt_step))
dLdW += np.outer(delta_t, s[bptt_step - 1])
dLdU[:, x[bptt_step]] += delta_t
# Update delta for next step
delta_t = self.W.T.dot(delta_t) * (1 - s[bptt_step - 1] ** 2)
return [dLdU, dLdV, dLdW]
# Performs one step of SGD.
def sgd_step(self, x, y, learning_rate):
# Calculate the gradients
dLdU, dLdV, dLdW = self.bptt(x, y)
# Change parameters according to gradients and learning rate
self.U -= learning_rate * dLdU
self.V -= learning_rate * dLdV
self.W -= learning_rate * dLdW
def gradient_check(self, x, y, h=0.001, error_threshold=0.01):
# Calculate the gradients using backpropagation. We want to checker if these are correct.
bptt_gradients = self.bptt(x, y)
# List of all parameters we want to check.
model_parameters = ['U', 'V', 'W']
# Gradient check for each parameter
for pidx, pname in enumerate(model_parameters):
# Get the actual parameter value from the mode, e.g. model.W
parameter = operator.attrgetter(pname)(self)
print("Performing gradient check for parameter % s with size % d." % (pname, np.prod(parameter.shape)))
# Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ...
it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
while not it.finished:
ix = it.multi_index
# Save the original value so we can reset it later
original_value = parameter[ix]
# Estimate the gradient using (f(x+h) - f(x-h))/(2*h)
parameter[ix] = original_value + h
gradplus = self.calculate_total_loss([x], [y])
parameter[ix] = original_value - h
gradminus = self.calculate_total_loss([x], [y])
estimated_gradient = (gradplus - gradminus) / (2 * h)
# Reset parameter to original value
parameter[ix] = original_value
# The gradient for this parameter calculated using backpropagation
backprop_gradient = bptt_gradients[pidx][ix]
# calculate The relative error: (|x - y|/(|x| + |y|))
relative_error = np.abs(backprop_gradient - estimated_gradient) / (
np.abs(backprop_gradient) + np.abs(estimated_gradient))
# If the error is to large fail the gradient check
if relative_error > error_threshold:
print("Gradient Check ERROR: parameter = % s ix = % s " % (pname, ix))
print("+h Loss: % f " % gradplus)
print("-h Loss: % f " % gradminus)
print("stimated_gradient: % f" % estimated_gradient)
print("Backpropagation gradient: % f " % backprop_gradient)
print("Relative Error: % f " % relative_error)
return
it.iternext()
print(" Gradient check for parameter % s passed." % (pname))
# Outer SGD Loop
# - model: The RNN model instance
# - X_train: The training data set
# - y_train: The training data labels
# - learning_rate: Initial learning rate for SGD
# - nepoch: Number of times to iterate through the complete dataset
# - evaluate_loss_after: Evaluate the loss after this many epochs
def train_with_sgd(model, X_train, y_train, learning_rate=0.005, nepoch=100, evaluate_loss_after=5):
# We keep track of the losses so we can plot them later
losses = []
num_examples_seen = 0
for epoch in range(nepoch):
# Optionally evaluate the loss
if (epoch % evaluate_loss_after == 0):
loss = model.calculate_loss(X_train, y_train)
losses.append((num_examples_seen, loss))
time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(" % s: Loss after num_examples_seen = % d epoch = % d: % f " % (
time, num_examples_seen, epoch, loss))
# Adjust the learning rate if loss increases
if (len(losses) > 1 and losses[-1][1] > losses[-2][1]):
learning_rate = learning_rate * 0.5
print("Setting learning rate to % f " % learning_rate)
sys.stdout.flush()
# For each training example...
for i in range(len(y_train)):
# One SGD step
model.sgd_step(X_train[i], y_train[i], learning_rate)
num_examples_seen += 1
def generate_sentence(model):
# We start the sentence with the start token
new_sentence = [word_to_index[sentence_start_token]]
# Repeat until we get an end token
while not new_sentence[-1] == word_to_index[sentence_end_token]:
next_word_probs = model.forward_propagation(new_sentence)
sampled_word = word_to_index[unknown_token]
# We don't want to sample unknown words
while sampled_word == word_to_index[unknown_token]:
samples = np.random.multinomial(1, next_word_probs[-1])
sampled_word = np.argmax(samples)
new_sentence.append(sampled_word)
sentence_str = [index_to_word[x] for x in new_sentence[1:-1]]
return sentence_str
vocabulary_size = 8000
unknown_token = "UNKNOWN_TOKEN "
sentence_start_token = "SENTENCE_START "
sentence_end_token = "SENTENCE_END "
# Read the data and append SENTENCE_START and SENTENCE_END tokens
print("Reading CSV file... ")
with open('data/reddit-comments-2015-08.csv', encoding='utf-8', mode='r') as f:
reader = csv.reader(f, skipinitialspace=True)
next(reader)
# Split full comments into sentences
sentences = itertools.chain(*[nltk.sent_tokenize(x[0].lower()) for x in reader])
# Append SENTENCE_START and SENTENCE_END
sentences = [" % s % s % s " % (sentence_start_token, x, sentence_end_token)
for x in sentences]
print("Parsed % d sentences. " % (len(sentences)))
# Tokenize the sentences into words
tokenized_sentences = [nltk.word_tokenize(sent) for sent in sentences]
# Count the word frequencies
word_freq = nltk.FreqDist(itertools.chain(*tokenized_sentences))
print("Found % d unique words tokens. " % len(word_freq.items()))
# Get the most common words and build index_to_word and word_to_index vectors
vocab = word_freq.most_common(vocabulary_size - 1)
index_to_word = [x[0] for x in vocab]
index_to_word.append(unknown_token)
word_to_index = dict([(w, i) for i, w in enumerate(index_to_word)])
print("Using vocabulary size % d. " % vocabulary_size)
print("The least frequent word in our vocabulary is '%s' and appeared % d times. " % (vocab[-1][0], vocab[-1][1]))
# Replace all words not in our vocabulary with the unknown token
for i, sent in enumerate(tokenized_sentences):
tokenized_sentences[i] = [w if w in word_to_index else unknown_token for w in sent]
print("\nExample sentence: '%s' " % sentences[0])
print("\nExample sentence after Pre - processing: '%s' " % tokenized_sentences[0])
# Create the training data
X_train = np.asarray([[word_to_index[w] for w in sent[:-1]] for sent in tokenized_sentences])
y_train = np.asarray([[word_to_index[w] for w in sent[1:]] for sent in tokenized_sentences])
np.random.seed(10)
# Train on a small subset of the data to see what happens
model = RNNNumpy(vocabulary_size)
losses = train_with_sgd(model, X_train[:100], y_train[:100], nepoch=10, evaluate_loss_after=1)
num_sentences = 10
senten_min_length = 7
for i in range(num_sentences):
sent = []
# We want long sentences, not sentences with one or two words
while len(sent) < senten_min_length:
sent = generate_sentence(model)
print("".join(sent))
6. 附录
6.1 中英文对照表
英文 | 中文 | 缩写 | 数学 |
---|---|---|---|
Backpropagation Through Time | 基于时间的反向传播 | BPTT | |
Cross Entropy | 交叉熵 | ||
Long Short Term Memory Networks | 长短期记忆网络 | LSTM | |
Natural Language Processing | 自然语言处理 | NLP | |
Recurrent Neural Networks | 循环(递归)神经网络 | RNN |
6.2 扩展阅读
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:跟我学神经网络4-循环神经网络 - Python技术站