cut_sentence.py
""" 实现句子的分词 注意点: 1. 实现单个字分词 2. 实现按照词语分词 2.1 加载词典 3. 使用停用词 """ import string import jieba import jieba.posseg as psg import logging stopwords_path = "../corpus/stopwords.txt" stopwords = [i.strip() for i in open(stopwords_path,encoding="utf-8").readlines()] #关闭jieba日志 jieba.setLogLevel(logging.INFO) #加载词典 jieba.load_userdict("../corpus/keywords.txt") continue_words = string.ascii_lowercase def _cut_sentence_by_word(sentence): """ 按照单个字进行分词,eg:python 可 以 做 人 工 智 能 么 ? jave :param sentence:str :return: [str,str,str] """ temp = "" result = [] for word in sentence: if word in continue_words: temp += word else: if len(temp)>0: result.append(temp) temp = "" result.append(word) if len(temp)>0: result.append(temp) return result def _cut_sentence(sentence,use_stopwords,use_seg): """ 按照词语进行分词 :param sentence:str :return: 【str,str,str】 """ if not use_seg: result = jieba.lcut(sentence) else: result = [(i.word,i.flag) for i in psg.cut(sentence)] if use_stopwords: if not use_seg: result = [i for i in result if i not in stopwords] else: result = [i for i in result if i[0] not in stopwords] return result def cut(sentence,by_word=False,use_stopwords=False,use_seg=False): """ 封装上述的方法 :param sentence:str :param by_word: bool,是否按照单个字分词 :param use_stopwords: 是否使用停用词 :param use_seg: 是否返回词性 :return: [(str,seg),str] """ sentence = sentence.lower() if by_word: return _cut_sentence_by_word(sentence) else: return _cut_sentence(sentence,use_stopwords,use_seg)
word_sequence.py
""" 文本序列化 """ class WordSequence: UNK_TAG = "<UNK>" #表示未知字符 PAD_TAG = "<PAD>" #填充符 SOS_TAG = "<SOS>" EOS_TAG = "<EOS>" PAD = 0 UNK = 1 SOS = 2 EOS = 3 def __init__(self): self.dict = { #保存词语和对应的数字 self.UNK_TAG:self.UNK, self.PAD_TAG:self.PAD, self.SOS_TAG:self.SOS, self.EOS_TAG:self.EOS } self.count = {} #统计词频的 def fit(self,sentence): """ 接受句子,统计词频 :param sentence:[str,str,str] :return:None """ for word in sentence: self.count[word] = self.count.get(word,0) + 1 #所有的句子fit之后,self.count就有了所有词语的词频 def build_vocab(self,min_count=5,max_count=None,max_features=None): """ 根据条件构造 词典 :param min_count:最小词频 :param max_count: 最大词频 :param max_features: 最大词语数 :return: """ if min_count is not None: self.count = {word:count for word,count in self.count.items() if count >= min_count} if max_count is not None: self.count = {word:count for word,count in self.count.items() if count <= max_count} if max_features is not None: #[(k,v),(k,v)....] --->{k:v,k:v} self.count = dict(sorted(self.count.items(),lambda x:x[-1],reverse=True)[:max_features]) for word in self.count: self.dict[word] = len(self.dict) #每次word对应一个数字 #把dict进行翻转 self.inverse_dict = dict(zip(self.dict.values(),self.dict.keys())) def transform(self,sentence,max_len=None,add_eos=False): """ 把句子转化为数字序列 :param sentence:[str,str,str] :return: [int,int,int] """ if add_eos and max_len is not None: max_len = max_len-1 if len(sentence) > max_len: sentence = sentence[:max_len] else: sentence = sentence + [self.PAD_TAG] *(max_len- len(sentence)) #填充PAD if add_eos: if self.PAD_TAG in sentence: index = sentence.index(self.PAD_TAG) sentence.insert(index,self.EOS_TAG) else: sentence += [self.EOS_TAG] return [self.dict.get(i,1) for i in sentence] def inverse_transform(self,incides): """ 把数字序列转化为字符 :param incides: [int,int,int] :return: [str,str,str] """ result = [] for i in incides: temp = self.inverse_dict.get(i, "<UNK>") if temp != self.EOS_TAG: result.append(temp) else: break return "".join(result) def __len__(self): return len(self.dict) if __name__ == '__main__': sentences = [["今天","天气","很","好"], ["今天","去","吃","什么"]] ws = WordSequence() for sentence in sentences: ws.fit(sentence) ws.build_vocab(min_count=1) print(ws.dict) ret = ws.transform(["好","好","好","好","好","好","好","热","呀"],max_len=3) print(ret) ret = ws.inverse_transform(ret) print(ret) pass
dataset.py
""" 准备数据集 """ import random from tqdm import tqdm import config import torch from torch.utils.data import DataLoader,Dataset #1. 进行数据集的切分 def chatbot_data_split(): input = open("../corpus/chatbot/input.txt",encoding="utf-8").readlines() target = open("../corpus/chatbot/target.txt",encoding="utf-8").readlines() f_train_input = open("../corpus/chatbot/train_input.txt","a",encoding="utf-8") f_train_target = open("../corpus/chatbot/train_target.txt","a",encoding="utf-8") f_test_input = open("../corpus/chatbot/test_input.txt","a",encoding="utf-8") f_test_target = open("../corpus/chatbot/test_target.txt","a",encoding="utf-8") for input,target in tqdm(zip(input,target),total=len(input)): if random.random()>0.8: #放入test f_test_input.write(input) f_test_target.write(target) else: f_train_input.write(input) f_train_target.write(target) f_train_input.close() f_train_target.close() f_test_input.close() f_test_target.close() #2. 准备dataset class ChatDataset(Dataset): def __init__(self,train=True): input_path = "../corpus/chatbot/train_input.txt" if train else "../corpus/chatbot/test_input.txt" target_path = "../corpus/chatbot/train_target.txt" if train else "../corpus/chatbot/test_target.txt" self.input_data = open(input_path,encoding="utf-8").readlines() self.target_data = open(target_path,encoding="utf-8").readlines() assert len(self.input_data) == len(self.target_data),"input target长度不一致!!!" def __getitem__(self, idx): input = self.input_data[idx].strip().split() target = self.target_data[idx].strip().split() #获取真实长度 input_len = len(input) if len(input)<config.chatbot_input_max_len else config.chatbot_input_max_len target_len = len(target) if len(target)<config.chatbot_target_max_len else config.chatbot_target_max_len input = config.input_ws.transform(input,max_len=config.chatbot_input_max_len) target = config.target_ws.transform(target,max_len=config.chatbot_target_max_len,add_eos=True) return input,target,input_len,target_len def __len__(self): return len(self.input_data) # 3. 准备dataloader def collate_fn(batch): """ :param batch:【(input,target,input_len,target_len),(),(一个getitem的结果)】 :return: """ #1. 对batch按照input的长度进行排序 batch = sorted(batch,key=lambda x:x[-2],reverse=True) #2. 进行batch操作 input, target, input_len, target_len = zip(*batch) #3. 把输入处理成LongTensor input = torch.LongTensor(input) target = torch.LongTensor(target) input_len = torch.LongTensor(input_len) target_len = torch.LongTensor(target_len) return input, target, input_len, target_len def get_dataloader(train=True): batch_size = config.chatbot_train_batch_size if train else config.chatbot_test_batch_size return DataLoader(ChatDataset(train),batch_size=batch_size,collate_fn=collate_fn,shuffle=True) if __name__ == '__main__': loader = get_dataloader() for idx,(input, target, input_len, target_len) in enumerate(loader): print(idx) print(input) print(target) print(input_len) print(target_len) break
config.py
""" 项目配置 """ import pickle import torch device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # device = ("cpu") ################# classify 相关的配置 ############### predict_ratio = 0.98 #预测可能性的阈值 ################# chatbot相关的配置 ################# chatbot_train_batch_size = 400 chatbot_test_batch_size = 500 input_ws = pickle.load(open("../chatbot/models/ws_input.pkl","rb")) target_ws = pickle.load(open("../chatbot/models/ws_target.pkl","rb")) chatbot_input_max_len = 20 chatbot_target_max_len = 30 chatbot_encoder_embedding_dim = 300 chatbot_encoder_hidden_size = 128 chatbot_encoder_number_layer = 2 chatbot_encoder_bidirectional = True chatbot_encoder_dropout = 0.3 chatbot_decoder_embedding_dim = 300 chatbot_decoder_hidden_size = 128*2 chatbot_decoder_number_layer = 1 chatbot_decoder_dropout = 0
encoder.py
""" 进行编码 """ import torch.nn as nn from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence import config import torch class Encoder(nn.Module): def __init__(self): super(Encoder,self).__init__() self.embedding = nn.Embedding(num_embeddings=len(config.input_ws), embedding_dim=config.chatbot_encoder_embedding_dim, padding_idx=config.input_ws.PAD ) # 2层双向,每层hidden_size 128 self.gru = nn.GRU(input_size=config.chatbot_encoder_embedding_dim, hidden_size=config.chatbot_encoder_hidden_size, num_layers=config.chatbot_encoder_number_layer, batch_first=True, bidirectional=config.chatbot_encoder_bidirectional, dropout=config.chatbot_encoder_dropout) def forward(self, input,input_len): input_embeded = self.embedding(input) #对输入进行打包 input_packed = pack_padded_sequence(input_embeded,input_len,batch_first=True) #经过GRU处理 output,hidden = self.gru(input_packed) # print("encoder gru hidden:",hidden.size()) #进行解包 output_paded,seq_len = pad_packed_sequence(output,batch_first=True,padding_value=config.input_ws.PAD) #获取最上层的正向和反向最后一个时间步的输出,表示整个句子 encoder_hidden = torch.cat([hidden[-2],hidden[-1]],dim=-1).unsqueeze(0) #[1,batch_size,128*2] return output_paded,encoder_hidden #[1,batch_size,128*2]
decoder.py
""" 实现解码器 """ import torch.nn as nn import config import torch import torch.nn.functional as F import numpy as np import random class Decoder(nn.Module): def __init__(self): super(Decoder,self).__init__() self.embedding = nn.Embedding(num_embeddings=len(config.target_ws), embedding_dim=config.chatbot_decoder_embedding_dim, padding_idx=config.target_ws.PAD) #需要的hidden_state形状:[1,batch_size,64] self.gru = nn.GRU(input_size=config.chatbot_decoder_embedding_dim, hidden_size=config.chatbot_decoder_hidden_size, num_layers=config.chatbot_decoder_number_layer, bidirectional=False, batch_first=True, dropout=config.chatbot_decoder_dropout) #假如encoder的hidden_size=64,num_layer=1 encoder_hidden :[2,batch_sizee,64] self.fc = nn.Linear(config.chatbot_decoder_hidden_size,len(config.target_ws)) def forward(self, encoder_hidden,target): # print("target size:",target.size()) #第一个时间步的输入的hidden_state decoder_hidden = encoder_hidden #[1,batch_size,128*2] #第一个时间步的输入的input batch_size = encoder_hidden.size(1) decoder_input = torch.LongTensor([[config.target_ws.SOS]]*batch_size).to(config.device) #[batch_size,1] # print("decoder_input:",decoder_input.size()) #使用全为0的数组保存数据,[batch_size,max_len,vocab_size] decoder_outputs = torch.zeros([batch_size,config.chatbot_target_max_len,len(config.target_ws)]).to(config.device) if random.random()>0.5: #teacher_forcing机制 for t in range(config.chatbot_target_max_len): decoder_output_t,decoder_hidden = self.forward_step(decoder_input,decoder_hidden) decoder_outputs[:,t,:] = decoder_output_t #获取当前时间步的预测值 value,index = decoder_output_t.max(dim=-1) decoder_input = index.unsqueeze(-1) #[batch_size,1] # print("decoder_input:",decoder_input.size()) else: for t in range(config.chatbot_target_max_len): decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden) decoder_outputs[:, t, :] = decoder_output_t #把真实值作为下一步的输入 decoder_input = target[:,t].unsqueeze(-1) # print("decoder_input size:",decoder_input.size()) return decoder_outputs,decoder_hidden def forward_step(self,decoder_input,decoder_hidden): ''' 计算一个时间步的结果 :param decoder_input: [batch_size,1] :param decoder_hidden: [1,batch_size,128*2] :return: ''' decoder_input_embeded = self.embedding(decoder_input) # print("decoder_input_embeded:",decoder_input_embeded.size()) #out:[batch_size,1,128*2] #decoder_hidden :[1,bathc_size,128*2] out,decoder_hidden = self.gru(decoder_input_embeded,decoder_hidden) # print("decoder_hidden size:",decoder_hidden.size()) #out :【batch_size,1,hidden_size】 out_squeezed = out.squeeze(dim=1) #去掉为1的维度 out_fc = F.log_softmax(self.fc(out_squeezed),dim=-1) #[bathc_size,vocab_size] # out_fc.unsqueeze_(dim=1) #[bathc_size,1,vocab_size] # print("out_fc:",out_fc.size()) return out_fc,decoder_hidden def evaluate(self,encoder_hidden): # 第一个时间步的输入的hidden_state decoder_hidden = encoder_hidden # [1,batch_size,128*2] # 第一个时间步的输入的input batch_size = encoder_hidden.size(1) decoder_input = torch.LongTensor([[config.target_ws.SOS]] * batch_size).to(config.device) # [batch_size,1] # print("decoder_input:",decoder_input.size()) # 使用全为0的数组保存数据,[batch_size,max_len,vocab_size] decoder_outputs = torch.zeros([batch_size, config.chatbot_target_max_len, len(config.target_ws)]).to( config.device) predict_result = [] for t in range(config.chatbot_target_max_len): decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden) decoder_outputs[:, t, :] = decoder_output_t # 获取当前时间步的预测值 value, index = decoder_output_t.max(dim=-1) predict_result.append(index.cpu().detach().numpy()) #[[batch],[batch]...] decoder_input = index.unsqueeze(-1) # [batch_size,1] # print("decoder_input:",decoder_input.size()) # predict_result.append(decoder_input) #把结果转化为ndarray,每一行是一条预测结果 predict_result = np.array(predict_result).transpose() return decoder_outputs, predict_result
seq2seq.py
""" 完成seq2seq模型 """ import torch.nn as nn from chatbot.encoder import Encoder from chatbot.decoder import Decoder class Seq2Seq(nn.Module): def __init__(self): super(Seq2Seq,self).__init__() self.encoder = Encoder() self.decoder = Decoder() def forward(self, input,input_len,target): encoder_outputs,encoder_hidden = self.encoder(input,input_len) decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target) return decoder_outputs def evaluate(self,input,input_len): encoder_outputs, encoder_hidden = self.encoder(input, input_len) decoder_outputs, predict_result = self.decoder.evaluate(encoder_hidden) return decoder_outputs,predict_result
train.py
""" 进行模型的训练 """ import torch import torch.nn.functional as F from chatbot.seq2seq import Seq2Seq from torch.optim import Adam from chatbot.dataset import get_dataloader from tqdm import tqdm import config import numpy as np import pickle from matplotlib import pyplot as plt # from eval import eval model = Seq2Seq().to(config.device) optimizer = Adam(model.parameters()) loss_list = [] def train(epoch): data_loader = get_dataloader(train=True) bar = tqdm(data_loader,total=len(data_loader)) for idx,(input,target,input_len,target_len) in enumerate(bar): input = input.to(config.device) target = target.to(config.device) input_len = input_len.to(config.device) optimizer.zero_grad() decoder_outputs = model(input,input_len,target) #[batch_Size,max_len,vocab_size] loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD) loss.backward() optimizer.step() loss_list.append(loss.item()) bar.set_description("epoch:{} idx:{} loss:{:.6f}".format(epoch,idx,np.mean(loss_list))) if idx%100 == 0: torch.save(model.state_dict(),"../chatbot/models/model.pkl") torch.save(optimizer.state_dict(),"../chatbot/models/optimizer.pkl") pickle.dump(loss_list,open("../chatbot/models/loss_list.pkl","wb")) if __name__ == '__main__': for i in range(5): train(i) # eval() # plt.figure(figsize=(50,8)) # plt.plot(range(len(loss_list)),loss_list) # plt.show()
eval.py
""" 进行模型的评估 """ import torch import torch.nn.functional as F from chatbot.dataset import get_dataloader from tqdm import tqdm import config import numpy as np import pickle from chatbot.seq2seq import Seq2Seq def eval(): model = Seq2Seq().to(config.device) model.eval() model.load_state_dict(torch.load("./models/model.pkl")) loss_list = [] data_loader = get_dataloader(train=False) bar = tqdm(data_loader,total=len(data_loader),desc="当前进行评估") with torch.no_grad(): for idx,(input,target,input_len,target_len) in enumerate(bar): input = input.to(config.device) target = target.to(config.device) input_len = input_len.to(config.device) decoder_outputs,predict_result = model.evaluate(input,input_len) #[batch_Size,max_len,vocab_size] loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD) loss_list.append(loss.item()) bar.set_description("idx:{} loss:{:.6f}".format(idx,np.mean(loss_list))) print("当前的平均损失为:",np.mean(loss_list)) def interface(): from chatbot.cut_sentence import cut import config #加载模型 model = Seq2Seq().to(config.device) model.eval() model.load_state_dict(torch.load("./models/model.pkl")) #准备待预测的数据 while True: origin_input =input("me>>:") # if "你是谁" in origin_input or "你叫什么" in origin_input: # result = "我是小智。" # elif "你好" in origin_input or "hello" in origin_input: # result = "Hello" # else: _input = cut(origin_input, by_word=True) input_len = torch.LongTensor([len(_input)]).to(config.device) _input = torch.LongTensor([config.input_ws.transform(_input,max_len=config.chatbot_input_max_len)]).to(config.device) outputs,predict = model.evaluate(_input,input_len) result = config.target_ws.inverse_transform(predict[0]) print("chatbot>>:",result) if __name__ == '__main__': interface()
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:pytorch seq2seq闲聊机器人 - Python技术站