num_sequence.py
""" 数字序列化方法 """ class NumSequence: """ input : intintint output :[int,int,int] """ PAD_TAG = "<PAD>" UNK_TAG = "<UNK>" SOS_TAG = "<SOS>" EOS_TAG = "<EOS>" PAD = 0 UNK = 1 SOS = 2 EOS = 3 def __init__(self): self.dict = { self.PAD_TAG:self.PAD, self.UNK_TAG: self.UNK, self.SOS_TAG: self.SOS, self.EOS_TAG: self.EOS } #0--》int ,1--->int,2--->int for i in range(0,10): self.dict[str(i)] = len(self.dict) self.inverse_dict = dict(zip(self.dict.values(),self.dict.keys())) def transform(self,sentence,max_len=None,add_eos=False): """ 实现转化为数字序列 :param sentence: list() ,["1","2","5"...str] :param max_len: int :param add_eos: 是否要添加结束符 :return: [int,int,int] """ if add_eos : #不是必须的,仅仅是为了最终句子的长度=设置的max;如果没有,最终的句子长度= max_len+1 max_len = max_len - 1 if max_len is not None: if len(sentence)> max_len: sentence = sentence[:max_len] else: sentence = sentence + [self.PAD_TAG]*(max_len-len(sentence)) if add_eos: if sentence[-1] == self.PAD_TAG: #句子中有PAD,在PAD之前添加EOS pad_index = sentence.index(self.PAD_TAG) sentence.insert(pad_index,self.EOS_TAG) else:#句子中没有PAD,在最后添加EOS sentence += [self.EOS_TAG] return [self.dict.get(i,self.UNK) for i in sentence] def inverse_transform(self,incides): """ 把序列转化为数字 :param incides:[1,3,4,5,2,] :return: "12312312" """ result = [] for i in incides: temp = self.inverse_dict.get(i, self.UNK_TAG) if temp != self.EOS_TAG: #把EOS之后的内容删除,123---》1230EOS,predict 1230EOS123 result.append(temp) else: break return "".join(result) def __len__(self): return len(self.dict) if __name__ == '__main__': num_Sequence = NumSequence() print(num_Sequence.dict) s = list("123123") ret = num_Sequence.transform(s) print(ret) ret = num_Sequence.inverse_transform(ret) print(ret)
dataset.py
""" 准备数据集 """ from torch.utils.data import DataLoader,Dataset import numpy as np import config import torch class NumDataset(Dataset): def __init__(self,train=True): np.random.seed(9) if train else np.random.seed(10) self.size = 400000 if train else 100000 self.data = np.random.randint(1,1e8,size=self.size) def __len__(self): return self.size def __getitem__(self, idx): input = list(str(self.data[idx])) target = input+["0"] return input,target,len(input),len(target) def collate_fn(batch): """ :param batch:[(一个getitem的结果),(一个getitem的结果),(一个getitem的结果)、、、、] :return: """ #把batch中的数据按照input的长度降序排序 batch = sorted(batch,key=lambda x:x[-2],reverse=True) input,target,input_len,target_len = zip(*batch) input = torch.LongTensor([config.ns.transform(i,max_len=config.max_len) for i in input]) target = torch.LongTensor([config.ns.transform(i,max_len=config.max_len,add_eos=True) for i in target]) input_len = torch.LongTensor(input_len) target_len = torch.LongTensor(target_len) return input,target,input_len,target_len def get_dataloader(train=True): batch_size = config.train_batchsize if train else config.test_batch_size return DataLoader(NumDataset(train),batch_size=batch_size,shuffle=False,collate_fn=collate_fn) if __name__ == '__main__': loader = get_dataloader(train=False) for idx,(input,target,input_len,target_len) in enumerate(loader): print(idx) print(input) print(target) print(input_len) print(target_len) break
config.py
""" 配置文件 """ from num_sequence import NumSequence import torch device= torch.device("cpu") # device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") train_batchsize = 256 test_batch_size = 1000 ns = NumSequence() max_len = 10
encoder.py
""" 进行编码 """ import torch.nn as nn from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence import config class Encoder(nn.Module): def __init__(self): super(Encoder,self).__init__() self.embedding = nn.Embedding(num_embeddings=len(config.ns), embedding_dim=50, padding_idx=config.ns.PAD ) self.gru = nn.GRU(input_size=50, hidden_size=64, num_layers=1, batch_first=True, bidirectional=False, dropout=0) def forward(self, input,input_len): input_embeded = self.embedding(input) #对输入进行打包 input_packed = pack_padded_sequence(input_embeded,input_len,batch_first=True) #经过GRU处理 output,hidden = self.gru(input_packed) # print("encoder gru hidden:",hidden.size()) #进行解包 output_paded,seq_len = pad_packed_sequence(output,batch_first=True,padding_value=config.ns.PAD) return output_paded,hidden #[1,batch_size,encoder_hidden_size]
decoder.py
""" 实现解码器 """ import torch.nn as nn import config import torch import torch.nn.functional as F import numpy as np class Decoder(nn.Module): def __init__(self): super(Decoder,self).__init__() self.embedding = nn.Embedding(num_embeddings=len(config.ns), embedding_dim=50, padding_idx=config.ns.PAD) #需要的hidden_state形状:[1,batch_size,64] self.gru = nn.GRU(input_size=50, hidden_size=64, num_layers=1, bidirectional=False, batch_first=True, dropout=0) #假如encoder的hidden_size=64,num_layer=1 encoder_hidden :[2,batch_sizee,64] self.fc = nn.Linear(64,len(config.ns)) def forward(self, encoder_hidden): #第一个时间步的输入的hidden_state decoder_hidden = encoder_hidden #[1,batch_size,encoder_hidden_size] #第一个时间步的输入的input batch_size = encoder_hidden.size(1) decoder_input = torch.LongTensor([[config.ns.SOS]]*batch_size).to(config.device) #[batch_size,1] # print("decoder_input:",decoder_input.size()) #使用全为0的数组保存数据,[batch_size,max_len,vocab_size] decoder_outputs = torch.zeros([batch_size,config.max_len,len(config.ns)]).to(config.device) for t in range(config.max_len): decoder_output_t,decoder_hidden = self.forward_step(decoder_input,decoder_hidden) decoder_outputs[:,t,:] = decoder_output_t #获取当前时间步的预测值 value,index = decoder_output_t.max(dim=-1) decoder_input = index.unsqueeze(-1) #[batch_size,1] # print("decoder_input:",decoder_input.size()) return decoder_outputs,decoder_hidden def forward_step(self,decoder_input,decoder_hidden): ''' 计算一个时间步的结果 :param decoder_input: [batch_size,1] :param decoder_hidden: [batch_size,encoder_hidden_size] :return: ''' decoder_input_embeded = self.embedding(decoder_input) # print("decoder_input_embeded:",decoder_input_embeded.size()) out,decoder_hidden = self.gru(decoder_input_embeded,decoder_hidden) #out :【batch_size,1,hidden_size】 out_squeezed = out.squeeze(dim=1) #去掉为1的维度 out_fc = F.log_softmax(self.fc(out_squeezed),dim=-1) #[bathc_size,vocab_size] # out_fc.unsqueeze_(dim=1) #[bathc_size,1,vocab_size] # print("out_fc:",out_fc.size()) return out_fc,decoder_hidden def evaluate(self,encoder_hidden): # 第一个时间步的输入的hidden_state decoder_hidden = encoder_hidden # [1,batch_size,encoder_hidden_size] # 第一个时间步的输入的input batch_size = encoder_hidden.size(1) decoder_input = torch.LongTensor([[config.ns.SOS]] * batch_size).to(config.device) # [batch_size,1] # print("decoder_input:",decoder_input.size()) # 使用全为0的数组保存数据,[batch_size,max_len,vocab_size] decoder_outputs = torch.zeros([batch_size, config.max_len, len(config.ns)]).to(config.device) decoder_predict = [] #[[],[],[]] #123456 ,targe:123456EOS,predict:123456EOS123 for t in range(config.max_len): decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden) decoder_outputs[:, t, :] = decoder_output_t # 获取当前时间步的预测值 value, index = decoder_output_t.max(dim=-1) decoder_input = index.unsqueeze(-1) # [batch_size,1] # print("decoder_input:",decoder_input.size()) decoder_predict.append(index.cpu().detach().numpy()) #返回预测值 decoder_predict = np.array(decoder_predict).transpose() #[batch_size,max_len] return decoder_outputs, decoder_predict
seq2seq.py
""" 完成seq2seq模型 """ import torch.nn as nn from encoder import Encoder from decoder import Decoder class Seq2Seq(nn.Module): def __init__(self): super(Seq2Seq,self).__init__() self.encoder = Encoder() self.decoder = Decoder() def forward(self, input,input_len): encoder_outputs,encoder_hidden = self.encoder(input,input_len) decoder_outputs,decoder_hidden = self.decoder(encoder_hidden) return decoder_outputs def evaluate(self,input,input_len): encoder_outputs, encoder_hidden = self.encoder(input, input_len) decoder_outputs, decoder_predict = self.decoder.evaluate(encoder_hidden) return decoder_outputs,decoder_predict
train.py
""" 进行模型的训练 """ import torch import torch.nn.functional as F from seq2seq import Seq2Seq from torch.optim import Adam from dataset import get_dataloader from tqdm import tqdm import config import numpy as np import pickle from matplotlib import pyplot as plt from eval import eval import os model = Seq2Seq().to(config.device) optimizer = Adam(model.parameters()) if os.path.exists("./models/model.pkl"): model.load_state_dict(torch.load("./models/model.pkl")) optimizer.load_state_dict(torch.load("./models/optimizer.pkl")) loss_list = [] def train(epoch): data_loader = get_dataloader(train=True) bar = tqdm(data_loader,total=len(data_loader)) for idx,(input,target,input_len,target_len) in enumerate(bar): input = input.to(config.device) target = target.to(config.device) input_len = input_len.to(config.device) optimizer.zero_grad() decoder_outputs = model(input,input_len) #[batch_Size,max_len,vocab_size] predict = decoder_outputs.view(-1,len(config.ns)) target = target.view(-1) loss = F.nll_loss(predict,target,ignore_index=config.ns.PAD) loss.backward() optimizer.step() loss_list.append(loss.item()) bar.set_description("epoch:{} idx:{} loss:{:.6f}".format(epoch,idx,np.mean(loss_list))) if idx%100 == 0: torch.save(model.state_dict(),"./models/model.pkl") torch.save(optimizer.state_dict(),"./models/optimizer.pkl") pickle.dump(loss_list,open("./models/loss_list.pkl","wb")) if __name__ == '__main__': for i in range(5): train(i) eval() plt.figure(figsize=(50,8)) plt.plot(range(len(loss_list)),loss_list) plt.show()
eval.py
""" 进行模型的评估 """ import torch import torch.nn.functional as F from seq2seq import Seq2Seq from torch.optim import Adam from dataset import get_dataloader from tqdm import tqdm import config import numpy as np import pickle from matplotlib import pyplot as plt def eval(): model = Seq2Seq().to(config.device) model.load_state_dict(torch.load("./models/model.pkl")) loss_list = [] acc_list = [] data_loader = get_dataloader(train=False) #获取测试集 with torch.no_grad(): for idx,(input,target,input_len,target_len) in enumerate(data_loader): input = input.to(config.device) # target = target #[batch_size,max_len] input_len = input_len.to(config.device) #decoder_predict:[batch_size,max_len] decoder_outputs,decoder_predict = model.evaluate(input,input_len) #[batch_Size,max_len,vocab_size] loss = F.nll_loss(decoder_outputs.view(-1,len(config.ns)),target.to(config.device).view(-1),ignore_index=config.ns.PAD) loss_list.append(loss.item()) #把traget 和 decoder_predict进行inverse_transform target_inverse_tranformed = [config.ns.inverse_transform(i) for i in target.numpy()] predict_inverse_tranformed = [config.ns.inverse_transform(i)for i in decoder_predict] cur_eq =[1 if target_inverse_tranformed[i] == predict_inverse_tranformed[i] else 0 for i in range(len(target_inverse_tranformed))] acc_list.extend(cur_eq) # print(np.mean(cur_eq)) print("mean acc:{} mean loss:{:.6f}".format(np.mean(acc_list),np.mean(loss_list))) def interface(_input): #进行预测 model = Seq2Seq().to(config.device) model.load_state_dict(torch.load("./models/model.pkl")) input = list(str(_input)) input_len = torch.LongTensor([len(input)]) #[1] input = torch.LongTensor([config.ns.transform(input)]) #[1,max_len] with torch.no_grad(): input = input.to(config.device) input_len = input_len.to(config.device) _, decoder_predict = model.evaluate(input, input_len) # [batch_Size,max_len,vocab_size] # decoder_predict进行inverse_transform pred = [config.ns.inverse_transform(i) for i in decoder_predict] print(_input,"---->",pred[0]) if __name__ == '__main__': interface("89767678")
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:pytorch seq2seq模型训练测试 - Python技术站