在 parameters.py 中,定义了各类参数。

 1 # training data directory
 2 TRAINING_DATA_DIR = './data/'
 3 
 4 # checkpoint directory
 5 CHECKPOINT_DIR = './training_checkpoints/'
 6 
 7 # training details
 8 BATCH_SIZE = 16
 9 BUFFER_SIZE = 128
10 EPOCHS = 15

在 numpy_dataset.py 中,创建了 5000 组训练数据集,模拟 y = x^3 + 1,并二进制格式写入文件。

 1 from parameters import TRAINING_DATA_DIR
 2 
 3 import numpy as np
 4 import matplotlib.pyplot as plt
 5 import os
 6 
 7 
 8 # create training data
 9 X = np.linspace(-1, 1, 5000)
10 np.random.shuffle(X)
11 y = X ** 3 + 1 + np.random.normal(0, 0.01, (5000,))
12 
13 # plot training data
14 plt.scatter(X, y)
15 plt.show()
16 
17 # save data
18 if not os.path.exists(TRAINING_DATA_DIR):
19     os.makedirs(TRAINING_DATA_DIR)
20 
21 X.tofile(os.path.join(TRAINING_DATA_DIR + 'training_data_X.bin'))
22 y.tofile(os.path.join(TRAINING_DATA_DIR + 'training_data_y.bin'))

[Tensorflow] 使用 model.save_weights() 保存 / 加载 Keras Subclassed Model


在 subclassed_model.py 中,通过对 tf.keras.models.Model 进行子类化,设计了两个自定义模型。

 1 import tensorflow as tf
 2 tf.enable_eager_execution()
 3 
 4 
 5 # model definition
 6 class Encoder(tf.keras.models.Model):
 7     def __init__(self):
 8         super(Encoder, self).__init__()
 9         self.fc1 = tf.keras.layers.Dense(units=16, activation='relu')
10         self.fc2 = tf.keras.layers.Dense(units=8, activation='relu')
11 
12     def call(self, inputs):
13         r = self.fc1(inputs)
14         return self.fc2(r)
15 
16 
17 class Decoder(tf.keras.models.Model):
18     def __init__(self):
19         super(Decoder, self).__init__()
20         self.fc = tf.keras.layers.Dense(units=1, activation=None)
21 
22     def call(self, inputs):
23         return self.fc(inputs)

在 loss_function.py 中,定义了损失函数。

1 import tensorflow as tf
2 tf.enable_eager_execution()
3 
4 
5 def loss(real, pred):
6     return tf.losses.mean_squared_error(labels=real, predictions=pred)

在 training.py 中,使用在 numpy_dataset.py 中创建的数据集训练模型,之后使用 model.save_weights() 保存 Keras Subclassed Model 模型,并创建验证集验证模型。

 1 from parameters import TRAINING_DATA_DIR, CHECKPOINT_DIR, BATCH_SIZE, BUFFER_SIZE, EPOCHS
 2 from subclassed_model import *
 3 from loss_function import loss
 4 
 5 import os
 6 import numpy as np
 7 import matplotlib.pyplot as plt
 8 
 9 
10 # load training data
11 training_X = np.fromfile(os.path.join(TRAINING_DATA_DIR, 'training_data_X.bin'), dtype=np.float64)
12 training_y = np.fromfile(os.path.join(TRAINING_DATA_DIR, 'training_data_y.bin'), dtype=np.float64)
13 
14 # plot training data
15 plt.scatter(training_X, training_y)
16 plt.show()
17 
18 # training dataset
19 training_dataset = tf.data.Dataset.from_tensor_slices((training_X, training_y)).batch(BATCH_SIZE).shuffle(BUFFER_SIZE)
20 
21 # model instance
22 encoder = Encoder()
23 decoder = Decoder()
24 
25 # optimizer
26 optimizer = tf.train.AdamOptimizer()
27 
28 # checkpoint
29 checkpoint_prefix_encoder = os.path.join(CHECKPOINT_DIR, 'encoder/', 'ckpt')
30 checkpoint_prefix_decoder = os.path.join(CHECKPOINT_DIR, 'decoder/', 'ckpt')
31 
32 if not os.path.exists(os.path.dirname(checkpoint_prefix_encoder)):
33     os.makedirs(os.path.dirname(checkpoint_prefix_encoder))
34 if not os.path.exists(os.path.dirname(checkpoint_prefix_decoder)):
35     os.makedirs(os.path.dirname(checkpoint_prefix_decoder))
36 
37 # training step
38 for epoch in range(EPOCHS):
39     epoch_loss = 0
40 
41     for (batch, (tx, ty)) in enumerate(training_dataset):
42         x = tf.cast(tx, tf.float32)
43         y = tf.cast(ty, tf.float32)
44         x = tf.expand_dims(x, axis=1)   # tf.Tensor([...], shape=(BATCH_SIZE, 1), dtype=float32)
45         y = tf.expand_dims(y, axis=1)   # tf.Tensor([...], shape=(BATCH_SIZE, 1), dtype=float32)
46 
47         with tf.GradientTape() as tape:
48             y_ = encoder(x)             # tf.Tensor([...], shape=(BATCH_SIZE, 8), dtype=float32)
49             prediction = decoder(y_)    # tf.Tensor([...], shape=(BATCH_SIZE, 1), dtype=float32)
50             batch_loss = loss(real=y, pred=prediction)
51 
52         variables = encoder.variables + decoder.variables
53         grads = tape.gradient(batch_loss, variables)
54         optimizer.apply_gradients(zip(grads, variables), global_step=tf.train.get_or_create_global_step())
55 
56         epoch_loss += batch_loss
57 
58         if (batch + 1) % 100 == 0:
59             print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
60                                                          batch + 1,
61                                                          batch_loss.numpy()))
62 
63     print('Epoch {} Loss {:.4f}'.format(epoch + 1,
64                                         epoch_loss / len(training_X)))
65 
66     if (epoch + 1) % 5 == 0:
67         encoder.save_weights(checkpoint_prefix_encoder)
68         decoder.save_weights(checkpoint_prefix_decoder)
69 
70 # create evaluation data
71 X = np.linspace(-1, 1, 3000)
72 np.random.shuffle(X)
73 
74 evaluation_X = tf.data.Dataset.from_tensor_slices(X).batch(BATCH_SIZE)
75 ey = []
76 
77 for (batch, ex) in enumerate(evaluation_X):
78     x = tf.cast(ex, tf.float32)
79     x = tf.expand_dims(x, axis=1)
80     prediction = decoder(encoder(x))
81     for i in range(len(prediction.numpy())):
82         ey.append(prediction.numpy()[i])
83 
84 plt.scatter(X, ey)
85 plt.show()
86 
87 # evaluate
88 eval_x = [[0.5]]
89 tensor_x = tf.convert_to_tensor(eval_x)
90 print(decoder(encoder(tensor_x)))

验证集评价结果如下图所示。

[Tensorflow] 使用 model.save_weights() 保存 / 加载 Keras Subclassed Model

使用测试样例 eval_x 进行测试,测试结果如下。

tf.Tensor([[1.122567]], shape=(1, 1), dtype=float32)

在 evaluate.py 中,使用 model.load_weights() 恢复 Keras Subclassed Model 模型,并在验证集上进行验证,验证结果如下图所示。

 1 from parameters import CHECKPOINT_DIR, BATCH_SIZE
 2 from subclassed_model import *
 3 
 4 import os
 5 import numpy as np
 6 import matplotlib.pyplot as plt
 7 
 8 
 9 # load model
10 enc = Encoder()
11 dec = Decoder()
12 
13 enc.load_weights(tf.train.latest_checkpoint(os.path.join(CHECKPOINT_DIR, 'encoder/')))
14 dec.load_weights(tf.train.latest_checkpoint(os.path.join(CHECKPOINT_DIR, 'decoder/')))
15 
16 # create evaluation data
17 X = np.linspace(-1, 1, 3000)
18 np.random.shuffle(X)
19 
20 evaluation_X = tf.data.Dataset.from_tensor_slices(X).batch(BATCH_SIZE)
21 ey = []
22 
23 for (batch, ex) in enumerate(evaluation_X):
24     x = tf.cast(ex, tf.float32)
25     x = tf.expand_dims(x, axis=1)
26     prediction = dec(enc(x))
27     for i in range(len(prediction.numpy())):
28         ey.append(prediction.numpy()[i])
29 
30 plt.scatter(X, ey)
31 plt.show()
32 
33 # evaluate
34 eval_x = [[0.5]]
35 tensor_x = tf.convert_to_tensor(eval_x)
36 print(dec(enc(tensor_x)))
37 
38 # model summary
39 enc.summary()
40 dec.summary()

[Tensorflow] 使用 model.save_weights() 保存 / 加载 Keras Subclassed Model

使用测试样例 eval_x 进行测试,测试结果如下。

tf.Tensor([[1.122567]], shape=(1, 1), dtype=float32)

恢复模型的测试结果,与训练后模型的测试结果一致,且无需 build 模型。


版权声明:本文为博主原创文章,欢迎转载,转载请注明作者及原文出处!