RNN は、これまでの 1 から N までの値から N+1 の値を予測します。(LSTM は RNN セルを実装する 1 つの方法にすぎません。)
簡単な答えは次のとおりです。
- 完全なシーケンス [[x 1 1 x 1 2 ,x 1 3 ],...,[x m 1 x m 2 ,x m 3 ]]で逆伝播を使用してモデルをトレーニングします。
- シーケンス [x 1 1 x 1 2 ,x 1 3 ,...]の開始時にトレーニング済みモデルを順方向に実行し、モデルからサンプリングして、シーケンス [x m 1 x m 2 ,x m 3、...]。
より長い答えは次のとおりです。
あなたの例は、モデルの初期化を示しています。逆伝播を実行するためのトレーニング関数と、結果を予測するサンプル関数も実装する必要があります。
次のコード スニペットは組み合わせたものであり、説明のみを目的としています...
トレーニングの場合は、データ イテレータに start + rest を含む完全なシーケンスを入力するだけです。
たとえば、サンプル コード tensorflow/models/rnn/ptb_word_lm.py では、トレーニング ループはターゲット (1 タイムステップ分シフトされた input_data) に対する input_data のバッチのコスト関数を計算します。
# compute a learning rate decay
session.run(tf.assign(self.learning_rate_variable, learning_rate))
logger.info("Epoch: %d Learning rate: %.3f" % (i + 1, session.run(self.learning_rate_variable)))
"""Runs the model on the given data."""
epoch_size = ((len(training_data) // self.batch_size) - 1) // self.num_steps
costs = 0.0
iters = 0
state = self.initial_state.eval()
for step, (x, y) in enumerate(self.data_iterator(training_data, self.batch_size, self.num_steps)):
# x and y should have shape [batch_size, num_steps]
cost, state, _ = session.run([self.cost_function, self.final_state, self.train_op],
{self.input_data: x,
self.targets: y,
self.initial_state: state})
costs += cost
iters += self.num_steps
tensorflow/models/rnn/reader.py のデータ反復子は、入力データを「x」として返し、ターゲットを「y」として返します。これは、x から 1 ステップ前方にシフトされたものです。(一連のトレーニング シーケンスをパッケージ化する、このようなデータ イテレータを作成する必要があります。)
def ptb_iterator(raw_data, batch_size, num_steps):
raw_data = np.array(raw_data, dtype=np.int32)
data_len = len(raw_data)
batch_len = data_len // batch_size
data = np.zeros([batch_size, batch_len], dtype=np.int32)
for i in range(batch_size):
data[i] = raw_data[batch_len * i:batch_len * (i + 1)]
epoch_size = (batch_len - 1) // num_steps
if epoch_size == 0:
raise ValueError("epoch_size == 0, decrease batch_size or num_steps")
for i in range(epoch_size):
x = data[:, i*num_steps:(i+1)*num_steps]
y = data[:, i*num_steps+1:(i+1)*num_steps+1]
yield (x, y)
トレーニングの後、モデルを順方向に実行して、シーケンスの開始をフィードすることでシーケンスの予測を行います start_x=[X1, X2, X3,...]...このスニペットは、クラスを表すバイナリ値を想定しています。 float 値のサンプリング関数を調整します。
def sample(self, sess, num=25, start_x):
# return state tensor with batch size 1 set to zeros, eval
state = self.rnn_layers.zero_state(1, tf.float32).eval()
# run model forward through the start of the sequence
for char in start_x:
# create a 1,1 tensor/scalar set to zero
x = np.zeros((1, 1))
# set to the vocab index
x[0, 0] = char
# fetch: final_state
# input_data = x, initial_state = state
[state] = sess.run([self.final_state], {self.input_data: x, self.initial_state:state})
def weighted_pick(weights):
# an array of cummulative sum of weights
t = np.cumsum(weights)
# scalar sum of tensor
s = np.sum(weights)
# randomly selects a value from the probability distribution
return(int(np.searchsorted(t, np.random.rand(1)*s)))
# PREDICT REST OF SEQUENCE
rest_x = []
# get last character in init
char = start_x[-1]
# sample next num chars in the sequence after init
score = 0.0
for n in xrange(num):
# init input to zeros
x = np.zeros((1, 1))
# lookup character index
x[0, 0] = char
# probs = tf.nn.softmax(self.logits)
# fetch: probs, final_state
# input_data = x, initial_state = state
[probs, state] = sess.run([self.output_layer, self.final_state], {self.input_data: x, self.initial_state:state})
p = probs[0]
logger.info("output=%s" % np.shape(p))
# sample = int(np.random.choice(len(p), p=p))
# select a random value from the probability distribution
sample = weighted_pick(p)
score += p[sample]
# look up the key with the index
logger.debug("sample[%d]=%d" % (n, sample))
pred = self.vocabulary[sample]
logger.debug("pred=%s" % pred)
# add the car to the output
rest_x.append(pred)
# set the next input character
char = pred
return rest_x, score