原文链接:http://www.one2know.cn/keras5/

CNN 卷积神经网络

from keras.layers import Dense,Activation,Conv2D,MaxPooling2D,Flatten
from keras.models import Model,Sequential
from keras.datasets import mnist
from keras.utils import np_utils

# 构建数据集
(x_train,y_train),(x_test,y_test) = mnist.load_data()
x_train = x_train.reshape(x_train.shape[0],1,28,28)/255
x_test = x_test.reshape(x_test.shape[0],1,28,28)/255
y_train = np_utils.to_categorical(y_train,num_classes=10)
y_test = np_utils.to_categorical(y_test,num_classes=10)
print(x_train[0].shape)
print(y_train[:3])

## 构建模型
model = Sequential()

# 第一层 卷积层
model.add(Conv2D(
    # input_shape=(60000,1,28,28),
    batch_input_shape=(32,1,28,28), # 输入数据的shape
    filters=32, # 滤波器数量为32
    kernel_size=5,
    strides=1,
    padding='same', # same即不改变原来数据的长度和宽度
    data_format='channels_first'
))
model.add(Activation('relu')) # 激励函数为relu

# 第二层 池化层
model.add(MaxPooling2D(
    pool_size=2, # 分辨率长宽各降低一半,输出数据shape为(32,14,14)
    strides=2,
    padding='same',
    data_format='channels_first'
))

# 再加一遍卷积层和池化层 输出数据shape为(64,7,7)
model.add(Conv2D(64, 5, strides=1, padding='same', data_format='channels_first'))
model.add(Activation('relu'))
model.add(MaxPooling2D(2, 2, 'same', data_format='channels_first'))

# 将数据抹平 再加一层全连接层
model.add(Flatten())
model.add(Dense(1024))
model.add(Activation('relu'))

# 再加一层全连接层 作为输出层
model.add(Dense(10))
model.add(Activation('softmax'))

# 设置adam优化方法,loss函数, metrics方法来观察输出结果
model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

# 训练模型
model.fit(x_train, y_train, epochs=1, batch_size=32)

# 预测
loss,accuracy = model.evaluate(x_test,y_test)
print('test loss:',loss)
print('test accuracy:',accuracy)

输出:

Epoch 1/1

   32/60000 [..............................] - ETA: 31:05 - loss: 2.2981 - acc: 0.1562
   64/60000 [..............................] - ETA: 19:05 - loss: 2.2658 - acc: 0.2344

   32/10000 [..............................] - ETA: 35s
   96/10000 [..............................] - ETA: 21s
   
test loss: 0.03328929296457209
test accuracy: 0.9897

RNN 循环神经网络

  • 序列数据
    Keras(四)CNN 卷积神经网络 RNN 循环神经网络 原理及实例
    我们想象现在有一组序列数据 data 0,1,2,3. 在当预测 result0 的时候,我们基于的是 data0, 同样在预测其他数据的时候, 我们也都只单单基于单个的数据. 每次使用的神经网络都是同一个 NN. 不过这些数据是有关联 顺序的 , 就像在厨房做菜, 酱料 A要比酱料 B 早放, 不然就串味了. 所以普通的神经网络结构并不能让 NN 了解这些数据之间的关联
  • 处理序列数据的神经网路
    最基本的方式,就是记住之前发生的事情. 那我们让神经网络也具备这种记住之前发生的事的能力. 再分析 Data0 的时候, 我们把分析结果存入记忆. 然后当分析 data1的时候, NN会产生新的记忆, 但是新记忆和老记忆是没有联系的. 我们就简单的把老记忆调用过来, 一起分析. 如果继续分析更多的有序数据 , RNN就会把之前的记忆都累积起来, 一起分析
    Keras(四)CNN 卷积神经网络 RNN 循环神经网络 原理及实例
    每次 RNN 运算完之后都会产生一个对于当前状态的描述 , state. 我们用简写 S( t) 代替, 然后这个 RNN开始分析 x(t+1) , 他会根据 x(t+1)产生s(t+1), 不过此时 y(t+1) 是由 s(t) 和 s(t+1) 共同创造的. 所以我们通常看到的 RNN 也可以表达成这种样子

RNN Classifier 实例

  • 依然使用MNIST数据集
import numpy as np
np.random.seed(1)

from keras.datasets import mnist
from keras.utils import np_utils
from keras.models import Sequential
from keras.layers import SimpleRNN, Activation, Dense
from keras.optimizers import Adam

# 超参数
TIME_STEPS = 28
INPUT_SIZE = 28
BATCH_INDEX = 0 # 从第0个开始训练
BATCH_SIZE = 50 # 一个batch50个数据
CELL_SIZE = 50 # 输出50个神经元
OUTPUT_SIZE = 10 # 输出10个类:0~9
LR = 0.001 # 学习速度

(x_train, y_train), (x_test, y_test) = mnist.load_data()

# data pre-processing
x_train = x_train.reshape(-1, 28, 28) / 255.      # 标准化
x_test = x_test.reshape(-1, 28, 28) / 255.
y_train = np_utils.to_categorical(y_train, num_classes=10)
y_test = np_utils.to_categorical(y_test, num_classes=10)

## 搭建模型
model = Sequential()

# 添加RNN层
model.add(SimpleRNN(
    batch_input_shape=(None, TIME_STEPS, INPUT_SIZE),
    output_dim=CELL_SIZE,
    unroll=True,
))

# 添加输出层
model.add(Dense(OUTPUT_SIZE))
model.add(Activation('softmax'))

# 设置优化器
adam = Adam(LR)
model.compile(optimizer=adam,loss='categorical_crossentropy',metrics=['accuracy'])

# 训练
for step in range(40001):
    X_batch = x_train[BATCH_INDEX: BATCH_INDEX+BATCH_SIZE, :, :]
    Y_batch = y_train[BATCH_INDEX: BATCH_INDEX+BATCH_SIZE, :]
    cost = model.train_on_batch(X_batch, Y_batch)
    BATCH_INDEX += BATCH_SIZE
    BATCH_INDEX = 0 if BATCH_INDEX >= x_train.shape[0] else BATCH_INDEX

    if step % 500 == 0: # 每训练500进行一次测试
        cost, accuracy = model.evaluate(x_test, y_test, batch_size=y_test.shape[0], verbose=False)
        print('test cost: ', cost, 'test accuracy: ', accuracy)

输出:

test cost:  2.3316211700439453 test accuracy:  0.12210000306367874
test cost:  0.5586103200912476 test accuracy:  0.8342999815940857
test cost:  0.4080776870250702 test accuracy:  0.8806999921798706
。。。。。。
test cost:  0.12420056015253067 test accuracy:  0.9653000235557556
test cost:  0.13435833156108856 test accuracy:  0.9632999897003174
test cost:  0.12595564126968384 test accuracy:  0.9653000235557556

RNN Regressor 实例

import numpy as np
np.random.seed(1)
from keras.models import Sequential
from keras.layers import Dense,TimeDistributed,SimpleRNN
from keras.optimizers import Adam
import matplotlib.pyplot as plt

# 超参数
BATCH_START = 0
TIME_STEPS = 20 # 时间步长 前面20个数据对下一个有影响
BATCH_SIZE = 50
INPUT_SIZE = 1
OUTPUT_SIZE = 1
CELL_SIZE = 20
LR = 0.01

# 生成数据
def get_batch():
    global BATCH_START, TIME_STEPS
    xs = np.arange(BATCH_START, BATCH_START+TIME_STEPS*BATCH_SIZE).reshape((BATCH_SIZE, TIME_STEPS)) / (10*np.pi)
    seq = np.sin(xs)
    res = np.cos(xs)
    BATCH_START += TIME_STEPS
    return [seq[:, :, np.newaxis], res[:, :, np.newaxis], xs]

# 查看数据
# get_batch()
# exit()

## 搭建网络
model = Sequential()
# 添加RNN层
model.add(SimpleRNN(
    batch_input_shape=(BATCH_SIZE, TIME_STEPS, INPUT_SIZE),
    output_dim=CELL_SIZE,
    return_sequences=True,  # 对于每一个时间点需不需要输出对应的output,True每个时刻都输出,False最后的输出output
    stateful=True,  # batch与batch之间是否有联系,需不需要将状态进行传递
))
# 添加输出层
model.add(TimeDistributed(Dense(OUTPUT_SIZE))) # TimeDistributed:对每一个output进行全连接的计算

# 优化器
adam = Adam()
model.compile(
    optimizer=adam,
    loss='mse',
)

# 训练
print('Training ------------')
for step in range(501):
    # data shape = (batch_num, steps, inputs/outputs)
    X_batch, Y_batch, xs = get_batch()
    cost = model.train_on_batch(X_batch, Y_batch)
    pred = model.predict(X_batch, BATCH_SIZE)
    plt.plot(xs[0, :], Y_batch[0].flatten(), 'r', xs[0, :], pred.flatten()[:TIME_STEPS], 'b--')
    plt.ylim((-1.2, 1.2))
    plt.draw()
    plt.pause(0.1)
    if step % 10 == 0:
        print('train cost: ', cost)