cut_sentence.py

"""
实现句子的分词
注意点:
1. 实现单个字分词

2. 实现按照词语分词
2.1 加载词典

3. 使用停用词
"""

import string
import jieba
import jieba.posseg as psg
import logging

stopwords_path = "../corpus/stopwords.txt"

stopwords = [i.strip() for i in open(stopwords_path,encoding="utf-8").readlines()]

#关闭jieba日志
jieba.setLogLevel(logging.INFO)

#加载词典
jieba.load_userdict("../corpus/keywords.txt")


continue_words = string.ascii_lowercase

def _cut_sentence_by_word(sentence):
    """
    按照单个字进行分词,eg:python 可 以 做 人 工 智 能 么 ? jave
    :param sentence:str
    :return: [str,str,str]
    """
    temp = ""
    result = []
    for word in sentence:
        if word in continue_words:
            temp += word
        else:
            if len(temp)>0:
                result.append(temp)
                temp = ""
            result.append(word)

    if len(temp)>0:
        result.append(temp)
    return result


def _cut_sentence(sentence,use_stopwords,use_seg):
    """
    按照词语进行分词
    :param sentence:str
    :return: 【str,str,str】
    """
    if not use_seg:
        result = jieba.lcut(sentence)
    else:
        result = [(i.word,i.flag) for i in psg.cut(sentence)]
    if use_stopwords:
        if not use_seg:
            result = [i for i in result if i not in stopwords]
        else:
            result = [i for i in result if i[0] not in stopwords]
    return result

def cut(sentence,by_word=False,use_stopwords=False,use_seg=False):
    """
    封装上述的方法
    :param sentence:str
    :param by_word: bool,是否按照单个字分词
    :param use_stopwords: 是否使用停用词
    :param use_seg: 是否返回词性
    :return: [(str,seg),str]
    """
    sentence = sentence.lower()
    if by_word:
        return _cut_sentence_by_word(sentence)
    else:
        return _cut_sentence(sentence,use_stopwords,use_seg)

word_sequence.py

"""
文本序列化
"""

class WordSequence:
    UNK_TAG = "<UNK>"  #表示未知字符
    PAD_TAG = "<PAD>"  #填充符
    SOS_TAG = "<SOS>"
    EOS_TAG = "<EOS>"
    PAD = 0
    UNK = 1
    SOS = 2
    EOS = 3

    def __init__(self):
        self.dict = {   #保存词语和对应的数字
            self.UNK_TAG:self.UNK,
            self.PAD_TAG:self.PAD,
            self.SOS_TAG:self.SOS,
            self.EOS_TAG:self.EOS
        }
        self.count = {}  #统计词频的


    def fit(self,sentence):
        """
        接受句子,统计词频
        :param sentence:[str,str,str]
        :return:None
        """
        for word in sentence:
            self.count[word] = self.count.get(word,0)  + 1  #所有的句子fit之后,self.count就有了所有词语的词频

    def build_vocab(self,min_count=5,max_count=None,max_features=None):
        """
        根据条件构造 词典
        :param min_count:最小词频
        :param max_count: 最大词频
        :param max_features: 最大词语数
        :return:
        """
        if min_count is not None:
            self.count = {word:count for word,count in self.count.items() if count >= min_count}
        if max_count is not None:
            self.count = {word:count for word,count in self.count.items() if count <= max_count}
        if max_features is not None:
            #[(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(),lambda x:x[-1],reverse=True)[:max_features])

        for word in self.count:
            self.dict[word]  = len(self.dict)  #每次word对应一个数字

        #把dict进行翻转
        self.inverse_dict = dict(zip(self.dict.values(),self.dict.keys()))

    def transform(self,sentence,max_len=None,add_eos=False):
        """
        把句子转化为数字序列
        :param sentence:[str,str,str]
        :return: [int,int,int]
        """
        if add_eos and max_len is not None:
            max_len = max_len-1

        if len(sentence) > max_len:
            sentence = sentence[:max_len]
        else:
            sentence = sentence + [self.PAD_TAG] *(max_len- len(sentence))  #填充PAD

        if add_eos:
            if self.PAD_TAG in sentence:
                index = sentence.index(self.PAD_TAG)
                sentence.insert(index,self.EOS_TAG)
            else:
                sentence += [self.EOS_TAG]

        return [self.dict.get(i,1) for i in sentence]

    def inverse_transform(self,incides):
        """
        把数字序列转化为字符
        :param incides: [int,int,int]
        :return: [str,str,str]
        """
        result = []
        for i in incides:
            temp = self.inverse_dict.get(i, "<UNK>")
            if temp != self.EOS_TAG:
                result.append(temp)
            else:
                break
        return "".join(result)

    def __len__(self):
        return len(self.dict)

if __name__ == '__main__':
    sentences  = [["今天","天气","很","好"],
                  ["今天","去","吃","什么"]]
    ws = WordSequence()
    for sentence in sentences:
        ws.fit(sentence)
    ws.build_vocab(min_count=1)
    print(ws.dict)
    ret = ws.transform(["好","好","好","好","好","好","好","热","呀"],max_len=3)
    print(ret)
    ret = ws.inverse_transform(ret)
    print(ret)
    pass

  

dataset.py

"""
准备数据集
"""
import random
from tqdm import tqdm
import config
import torch
from torch.utils.data import DataLoader,Dataset



#1. 进行数据集的切分
def chatbot_data_split():
    input = open("../corpus/chatbot/input.txt",encoding="utf-8").readlines()
    target = open("../corpus/chatbot/target.txt",encoding="utf-8").readlines()
    f_train_input = open("../corpus/chatbot/train_input.txt","a",encoding="utf-8")
    f_train_target = open("../corpus/chatbot/train_target.txt","a",encoding="utf-8")
    f_test_input = open("../corpus/chatbot/test_input.txt","a",encoding="utf-8")
    f_test_target = open("../corpus/chatbot/test_target.txt","a",encoding="utf-8")
    for input,target in tqdm(zip(input,target),total=len(input)):
        if random.random()>0.8:
            #放入test
            f_test_input.write(input)
            f_test_target.write(target)
        else:
            f_train_input.write(input)
            f_train_target.write(target)
    f_train_input.close()
    f_train_target.close()
    f_test_input.close()
    f_test_target.close()



#2. 准备dataset

class ChatDataset(Dataset):
    def __init__(self,train=True):
        input_path = "../corpus/chatbot/train_input.txt" if train else "../corpus/chatbot/test_input.txt"
        target_path = "../corpus/chatbot/train_target.txt" if train else  "../corpus/chatbot/test_target.txt"
        self.input_data = open(input_path,encoding="utf-8").readlines()
        self.target_data = open(target_path,encoding="utf-8").readlines()
        assert len(self.input_data) == len(self.target_data),"input target长度不一致!!!"

    def __getitem__(self, idx):
        input = self.input_data[idx].strip().split()
        target = self.target_data[idx].strip().split()
        #获取真实长度
        input_len = len(input) if len(input)<config.chatbot_input_max_len else config.chatbot_input_max_len
        target_len = len(target) if len(target)<config.chatbot_target_max_len else config.chatbot_target_max_len

        input = config.input_ws.transform(input,max_len=config.chatbot_input_max_len)
        target = config.target_ws.transform(target,max_len=config.chatbot_target_max_len,add_eos=True)
        return input,target,input_len,target_len


    def __len__(self):
        return len(self.input_data)



# 3. 准备dataloader
def collate_fn(batch):
    """
    :param batch:【(input,target,input_len,target_len),(),(一个getitem的结果)】
    :return:
    """
    #1. 对batch按照input的长度进行排序
    batch = sorted(batch,key=lambda x:x[-2],reverse=True)
    #2. 进行batch操作
    input, target, input_len, target_len = zip(*batch)
    #3. 把输入处理成LongTensor
    input = torch.LongTensor(input)
    target = torch.LongTensor(target)
    input_len = torch.LongTensor(input_len)
    target_len = torch.LongTensor(target_len)
    return input, target, input_len, target_len


def get_dataloader(train=True):
    batch_size = config.chatbot_train_batch_size if train else config.chatbot_test_batch_size
    return DataLoader(ChatDataset(train),batch_size=batch_size,collate_fn=collate_fn,shuffle=True)

if __name__ == '__main__':
    loader = get_dataloader()
    for idx,(input, target, input_len, target_len) in enumerate(loader):
        print(idx)
        print(input)
        print(target)
        print(input_len)
        print(target_len)
        break

config.py

"""
项目配置
"""
import pickle
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = ("cpu")

################# classify 相关的配置 ###############
predict_ratio = 0.98  #预测可能性的阈值

################# chatbot相关的配置 #################
chatbot_train_batch_size = 400
chatbot_test_batch_size = 500

input_ws = pickle.load(open("../chatbot/models/ws_input.pkl","rb"))
target_ws = pickle.load(open("../chatbot/models/ws_target.pkl","rb"))
chatbot_input_max_len = 20
chatbot_target_max_len = 30

chatbot_encoder_embedding_dim = 300
chatbot_encoder_hidden_size = 128
chatbot_encoder_number_layer = 2
chatbot_encoder_bidirectional = True
chatbot_encoder_dropout = 0.3

chatbot_decoder_embedding_dim = 300
chatbot_decoder_hidden_size = 128*2
chatbot_decoder_number_layer = 1
chatbot_decoder_dropout = 0

  

  

encoder.py

"""
进行编码
"""

import torch.nn as nn
from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence
import config
import torch


class Encoder(nn.Module):
    def __init__(self):
        super(Encoder,self).__init__()
        self.embedding = nn.Embedding(num_embeddings=len(config.input_ws),
                                     embedding_dim=config.chatbot_encoder_embedding_dim,
                                     padding_idx=config.input_ws.PAD
                                     )
        # 2层双向,每层hidden_size 128
        self.gru = nn.GRU(input_size=config.chatbot_encoder_embedding_dim,
                          hidden_size=config.chatbot_encoder_hidden_size,
                          num_layers=config.chatbot_encoder_number_layer,
                          batch_first=True,
                          bidirectional=config.chatbot_encoder_bidirectional,
                          dropout=config.chatbot_encoder_dropout)


    def forward(self, input,input_len):
        input_embeded = self.embedding(input)

        #对输入进行打包
        input_packed = pack_padded_sequence(input_embeded,input_len,batch_first=True)
        #经过GRU处理
        output,hidden = self.gru(input_packed)
        # print("encoder gru hidden:",hidden.size())
        #进行解包
        output_paded,seq_len = pad_packed_sequence(output,batch_first=True,padding_value=config.input_ws.PAD)
        #获取最上层的正向和反向最后一个时间步的输出,表示整个句子
        encoder_hidden = torch.cat([hidden[-2],hidden[-1]],dim=-1).unsqueeze(0) #[1,batch_size,128*2]
        return output_paded,encoder_hidden  #[1,batch_size,128*2]

  decoder.py

"""
实现解码器
"""
import torch.nn as nn
import config
import torch
import torch.nn.functional as F
import numpy as np
import random


class Decoder(nn.Module):
    def __init__(self):
        super(Decoder,self).__init__()

        self.embedding = nn.Embedding(num_embeddings=len(config.target_ws),
                                      embedding_dim=config.chatbot_decoder_embedding_dim,
                                      padding_idx=config.target_ws.PAD)

        #需要的hidden_state形状:[1,batch_size,64]
        self.gru = nn.GRU(input_size=config.chatbot_decoder_embedding_dim,
                          hidden_size=config.chatbot_decoder_hidden_size,
                          num_layers=config.chatbot_decoder_number_layer,
                          bidirectional=False,
                          batch_first=True,
                          dropout=config.chatbot_decoder_dropout)

        #假如encoder的hidden_size=64,num_layer=1 encoder_hidden :[2,batch_sizee,64]

        self.fc = nn.Linear(config.chatbot_decoder_hidden_size,len(config.target_ws))

    def forward(self, encoder_hidden,target):
        # print("target size:",target.size())
        #第一个时间步的输入的hidden_state
        decoder_hidden = encoder_hidden  #[1,batch_size,128*2]
        #第一个时间步的输入的input
        batch_size = encoder_hidden.size(1)
        decoder_input = torch.LongTensor([[config.target_ws.SOS]]*batch_size).to(config.device)         #[batch_size,1]
        # print("decoder_input:",decoder_input.size())


        #使用全为0的数组保存数据,[batch_size,max_len,vocab_size]
        decoder_outputs = torch.zeros([batch_size,config.chatbot_target_max_len,len(config.target_ws)]).to(config.device)

        if random.random()>0.5:    #teacher_forcing机制

            for t in range(config.chatbot_target_max_len):
                decoder_output_t,decoder_hidden = self.forward_step(decoder_input,decoder_hidden)
                decoder_outputs[:,t,:] = decoder_output_t


                #获取当前时间步的预测值
                value,index = decoder_output_t.max(dim=-1)
                decoder_input = index.unsqueeze(-1)  #[batch_size,1]
                # print("decoder_input:",decoder_input.size())
        else:
            for t in range(config.chatbot_target_max_len):
                decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
                decoder_outputs[:, t, :] = decoder_output_t
                #把真实值作为下一步的输入
                decoder_input = target[:,t].unsqueeze(-1)
                # print("decoder_input size:",decoder_input.size())
        return decoder_outputs,decoder_hidden


    def forward_step(self,decoder_input,decoder_hidden):
        '''
        计算一个时间步的结果
        :param decoder_input: [batch_size,1]
        :param decoder_hidden: [1,batch_size,128*2]
        :return:
        '''

        decoder_input_embeded = self.embedding(decoder_input)
        # print("decoder_input_embeded:",decoder_input_embeded.size())

        #out:[batch_size,1,128*2]
        #decoder_hidden :[1,bathc_size,128*2]
        out,decoder_hidden = self.gru(decoder_input_embeded,decoder_hidden)
        # print("decoder_hidden size:",decoder_hidden.size())
        #out :【batch_size,1,hidden_size】

        out_squeezed = out.squeeze(dim=1) #去掉为1的维度
        out_fc = F.log_softmax(self.fc(out_squeezed),dim=-1) #[bathc_size,vocab_size]
        # out_fc.unsqueeze_(dim=1) #[bathc_size,1,vocab_size]
        # print("out_fc:",out_fc.size())
        return out_fc,decoder_hidden

    def evaluate(self,encoder_hidden):

        # 第一个时间步的输入的hidden_state
        decoder_hidden = encoder_hidden  # [1,batch_size,128*2]
        # 第一个时间步的输入的input
        batch_size = encoder_hidden.size(1)
        decoder_input = torch.LongTensor([[config.target_ws.SOS]] * batch_size).to(config.device)  # [batch_size,1]
        # print("decoder_input:",decoder_input.size())

        # 使用全为0的数组保存数据,[batch_size,max_len,vocab_size]
        decoder_outputs = torch.zeros([batch_size, config.chatbot_target_max_len, len(config.target_ws)]).to(
            config.device)

        predict_result = []
        for t in range(config.chatbot_target_max_len):
            decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
            decoder_outputs[:, t, :] = decoder_output_t

            # 获取当前时间步的预测值
            value, index = decoder_output_t.max(dim=-1)
            predict_result.append(index.cpu().detach().numpy()) #[[batch],[batch]...]
            decoder_input = index.unsqueeze(-1)  # [batch_size,1]
            # print("decoder_input:",decoder_input.size())
            # predict_result.append(decoder_input)
        #把结果转化为ndarray,每一行是一条预测结果
        predict_result = np.array(predict_result).transpose()
        return decoder_outputs, predict_result

  seq2seq.py

"""
完成seq2seq模型
"""
import torch.nn as nn
from chatbot.encoder import Encoder
from chatbot.decoder import Decoder


class Seq2Seq(nn.Module):
    def __init__(self):
        super(Seq2Seq,self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def forward(self, input,input_len,target):
        encoder_outputs,encoder_hidden = self.encoder(input,input_len)
        decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target)
        return decoder_outputs

    def evaluate(self,input,input_len):
        encoder_outputs, encoder_hidden = self.encoder(input, input_len)
        decoder_outputs, predict_result = self.decoder.evaluate(encoder_hidden)
        return decoder_outputs,predict_result

  train.py

"""
进行模型的训练
"""
import torch
import torch.nn.functional as F
from chatbot.seq2seq import Seq2Seq
from torch.optim import Adam
from chatbot.dataset import get_dataloader
from tqdm import tqdm
import config
import numpy as np
import pickle
from matplotlib import pyplot as plt
# from eval import eval

model = Seq2Seq().to(config.device)

optimizer = Adam(model.parameters())

loss_list = []

def train(epoch):
    data_loader = get_dataloader(train=True)
    bar = tqdm(data_loader,total=len(data_loader))

    for idx,(input,target,input_len,target_len) in enumerate(bar):
        input = input.to(config.device)
        target = target.to(config.device)
        input_len = input_len.to(config.device)
        optimizer.zero_grad()
        decoder_outputs = model(input,input_len,target) #[batch_Size,max_len,vocab_size]
        loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD)
        loss.backward()
        optimizer.step()
        loss_list.append(loss.item())
        bar.set_description("epoch:{} idx:{} loss:{:.6f}".format(epoch,idx,np.mean(loss_list)))

        if idx%100 == 0:
            torch.save(model.state_dict(),"../chatbot/models/model.pkl")
            torch.save(optimizer.state_dict(),"../chatbot/models/optimizer.pkl")
            pickle.dump(loss_list,open("../chatbot/models/loss_list.pkl","wb"))


if __name__ == '__main__':
    for i in range(5):
        train(i)
        # eval()

    # plt.figure(figsize=(50,8))
    # plt.plot(range(len(loss_list)),loss_list)
    # plt.show()

  eval.py

"""
进行模型的评估
"""

import torch
import torch.nn.functional as F
from chatbot.dataset import get_dataloader
from tqdm import tqdm
import config
import numpy as np
import pickle
from chatbot.seq2seq import Seq2Seq

def eval():
    model = Seq2Seq().to(config.device)
    model.eval()
    model.load_state_dict(torch.load("./models/model.pkl"))

    loss_list = []
    data_loader = get_dataloader(train=False)
    bar = tqdm(data_loader,total=len(data_loader),desc="当前进行评估")
    with torch.no_grad():
        for idx,(input,target,input_len,target_len) in enumerate(bar):
            input = input.to(config.device)
            target = target.to(config.device)
            input_len = input_len.to(config.device)

            decoder_outputs,predict_result = model.evaluate(input,input_len) #[batch_Size,max_len,vocab_size]
            loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD)
            loss_list.append(loss.item())
            bar.set_description("idx:{} loss:{:.6f}".format(idx,np.mean(loss_list)))
    print("当前的平均损失为:",np.mean(loss_list))


def interface():
    from chatbot.cut_sentence import cut
    import config
    #加载模型
    model = Seq2Seq().to(config.device)
    model.eval()
    model.load_state_dict(torch.load("./models/model.pkl"))

    #准备待预测的数据
    while True:
        origin_input =input("me>>:")
        # if "你是谁" in origin_input or "你叫什么" in origin_input:
        #     result = "我是小智。"
        # elif "你好" in origin_input or "hello" in origin_input:
        #     result = "Hello"
        # else:
        _input = cut(origin_input, by_word=True)
        input_len = torch.LongTensor([len(_input)]).to(config.device)
        _input = torch.LongTensor([config.input_ws.transform(_input,max_len=config.chatbot_input_max_len)]).to(config.device)

        outputs,predict = model.evaluate(_input,input_len)
        result = config.target_ws.inverse_transform(predict[0])
        print("chatbot>>:",result)





if __name__ == '__main__':
    interface()