关于自编码器的原理见另一篇博客 : 编码器AE & VAE

这里谈谈对于变分自编码器(Variational auto-encoder)即VAE的实现。

 

1. 稀疏编码

首先介绍一下“稀疏编码”这一概念。

       早期学者在黑白风景照片中可以提取到许多16*16像素的图像碎片。而这些图像碎片几乎都可由64种正交的边组合得到。而且组合出一张碎片所需的边的数目很少,即稀疏的。同时在音频中大多数声音也可由几种基本结构组合得到。这其实就是特征的稀疏表达。即使用少量的基本特征来组合更加高层抽象的特征。在神经网络中即体现出前一层是未加工的像素,而后一层就是对这些像素的非线性组合。

       有监督情况下可以利用深层卷积网络来提取特征,而自编码器就是无监督情况下根据自身的高阶特征编码自己。自编码器是输入输出相同的神经网络。其特点是利用稀疏的高阶特征来重构自己。一般而言自编码器的中间隐层节点的数量要小于输入节点的数量,即实现降维过程。因为对于少于输入节点的隐藏层来说无法将输入的全部信息保留,只能优先选择部分重要的特征,而后利用这些特征来复原。此外我们可以给隐层的权重加上L2正则,正则项惩罚因子越大,接近于0的系数越多,从而特征更加稀疏!   

       关于自编码器我们可以加入一些限制使其实现不同的功能,例如去噪自编码(Denoising AutoEncoder)。输入是加了噪声的数据,而输出是原始数据,在学习过程中,只有学到更鲁棒、更频繁的特征模式才能将噪声略去,回复原始数据。如果自编码器的隐层只有一层,那么原理类似于主成分分析PCA。

        HInton提出的DBN模型有多个隐含层,每个隐含层都是限制玻尔兹曼机RBM。DBN训练时需先对每两层间进行无监督的预训练,这一过程实为一个多层的自编码器,可以将每整个网络的权重初始化到一个理想的分布。最后通过反向传播算法调整模型权重,这个步骤会使用经过标注的信息来做监督性的分类训练。当年DBN给训练深度神经网络提供了可能性,它解决了网络过深带来的深度弥散。简言之:先用自编码器的方法进行无监督的预训练,提取特征并初始化权重,然后使用标注信息进行监督式的训练。

 

2.  VAE工作流程

先看下图:

Pytorch入门之VAE

        AE的工作其实是实现了    图片->向量->图片   这一过程。就是说给定一张图片编码后得到一个向量,然后将这一向量进行解码后就得到了原始的图片。这个解码后的图片和之前的原图一样吗?不完全一样。因为一般而言,如前所述是从低维隐层中恢复原图。但是AE另我们现在能训练任意多的图片,如果我们把这些图片的编码向量存在来,那以后就能通过这些编码向量来重构我们的图像,称之为标准自编码器。可这还不够,如果现在我随机拿出一个很离谱的向量直接另其解码,那解码出来的东西十有八九是无意义的东西。

       所以我们希望AE编码出的code符合一种分布(eg:高斯混合模型),那么我们就可以从这个高斯分布任意采样出一个code,给这个code解码那么就会生成一张原图类似的图。而这个强迫分布就是VAE与AE的不同之处了。VAE的编码器输出包括两部分:m和σ。其中e是正态分布, c为编码结果。m、e、σ、c的形状一样,都为(batch_size,latent_code_num) 。这个latent_code_num就相当于高斯混合分布的高斯数量。每个高斯都有自己的均值、方差。所以共有latent_code_num个均值、方差。

       接下来是VAE的损失函数:由两部分的和组成(bce_loss、kld_loss)。bce_loss即为binary_cross_entropy(二分类交叉熵)损失,即用于衡量原图与生成图片的像素误差。kld_loss即为KL-divergence(KL散度),用来衡量潜在变量的分布和单位高斯分布的差异。

 

 3. Pytorch实现 

  1 #!/usr/bin/env python3
  2 # -*- coding: utf-8 -*-
  3 """
  4 Created on Sat Mar 10 20:48:03 2018
  5 
  6 @author: lps
  7 """
  8 
  9 import torch
 10 import torch.nn as nn
 11 import torch.optim as optim
 12 import torch.nn.functional as F
 13 from torch.autograd import Variable
 14 from torchvision import transforms
 15 import torchvision.datasets as dst
 16 from torchvision.utils import save_image
 17 
 18 
 19 EPOCH = 15
 20 BATCH_SIZE = 64
 21 n = 2   # num_workers
 22 LATENT_CODE_NUM = 32   
 23 log_interval = 10
 24 
 25 
 26 transform=transforms.Compose([transforms.ToTensor()])
 27 data_train = dst.MNIST('MNIST_data/', train=True, transform=transform, download=False)
 28 data_test = dst.MNIST('MNIST_data/', train=False, transform=transform)
 29 train_loader = torch.utils.data.DataLoader(dataset=data_train, num_workers=n,batch_size=BATCH_SIZE, shuffle=True)
 30 test_loader = torch.utils.data.DataLoader(dataset=data_test, num_workers=n,batch_size=BATCH_SIZE, shuffle=True)
 31 
 32 
 33 class VAE(nn.Module):
 34       def __init__(self):
 35             super(VAE, self).__init__()
 36       
 37             self.encoder = nn.Sequential(
 38                   nn.Conv2d(1, 64, kernel_size=4, stride=2, padding=1),
 39                   nn.BatchNorm2d(64),
 40                   nn.LeakyReLU(0.2, inplace=True),
 41                   
 42                   nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
 43                   nn.BatchNorm2d(128),
 44                   nn.LeakyReLU(0.2, inplace=True),
 45                       
 46                   nn.Conv2d(128, 128, kernel_size=3 ,stride=1, padding=1),
 47                   nn.BatchNorm2d(128),
 48                   nn.LeakyReLU(0.2, inplace=True),                  
 49                   )
 50             
 51             self.fc11 = nn.Linear(128 * 7 * 7, LATENT_CODE_NUM)
 52             self.fc12 = nn.Linear(128 * 7 * 7, LATENT_CODE_NUM)
 53             self.fc2 = nn.Linear(LATENT_CODE_NUM, 128 * 7 * 7)
 54             
 55             self.decoder = nn.Sequential(                
 56                   nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
 57                   nn.ReLU(inplace=True),
 58                   
 59                   nn.ConvTranspose2d(64, 1, kernel_size=4, stride=2, padding=1),
 60                   nn.Sigmoid()
 61                   )
 62 
 63       def reparameterize(self, mu, logvar):
 64             eps = Variable(torch.randn(mu.size(0), mu.size(1))).cuda()
 65             z = mu + eps * torch.exp(logvar/2)            
 66             
 67             return z
 68       
 69       def forward(self, x):
 70              out1, out2 = self.encoder(x), self.encoder(x)  # batch_s, 8, 7, 7
 71              mu = self.fc11(out1.view(out1.size(0),-1))     # batch_s, latent
 72              logvar = self.fc12(out2.view(out2.size(0),-1)) # batch_s, latent
 73              z = self.reparameterize(mu, logvar)      # batch_s, latent      
 74              out3 = self.fc2(z).view(z.size(0), 128, 7, 7)    # batch_s, 8, 7, 7
 75              
 76              return self.decoder(out3), mu, logvar
 77 
 78 
 79 def loss_func(recon_x, x, mu, logvar):
 80       BCE = F.binary_cross_entropy(recon_x, x,  size_average=False)
 81       KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
 82       
 83       return BCE+KLD
 84 
 85 
 86 vae = VAE().cuda()
 87 optimizer =  optim.Adam(vae.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)
 88 
 89 
 90 def train(EPOCH):
 91       vae.train()
 92       total_loss = 0
 93       for i, (data, _) in enumerate(train_loader, 0):
 94             data = Variable(data).cuda()
 95             optimizer.zero_grad()
 96             recon_x, mu, logvar = vae.forward(data)
 97             loss = loss_func(recon_x, data, mu, logvar)
 98             loss.backward()
 99             total_loss += loss.data[0]
100             optimizer.step()
101             
102             if i % log_interval == 0:
103                   sample = Variable(torch.randn(64, LATENT_CODE_NUM)).cuda()
104                   sample = vae.decoder(vae.fc2(sample).view(64, 128, 7, 7)).cpu()
105                   save_image(sample.data.view(64, 1, 28, 28),
106                'result/sample_' + str(epoch) + '.png')
107                   print('Train Epoch:{} -- [{}/{} ({:.0f}%)] -- Loss:{:.6f}'.format(
108                               epoch, i*len(data), len(train_loader.dataset), 
109                               100.*i/len(train_loader), loss.data[0]/len(data)))
110                   
111       print('====> Epoch: {} Average loss: {:.4f}'.format(
112           epoch, total_loss / len(train_loader.dataset)))
113       
114 for epoch in range(1, EPOCH):
115     train(epoch)
116       

main.py