# 文本情感分类

# 目标

  1. 知道文本处理的基本方法
  2. 能够使用数据实现情感分类的

# 1. 案例介绍

为了对前面的 word embedding 这种常用的文本向量化的方法进行巩固,这里我们会完成一个文本情感分类的案例

现在我们有一个经典的数据集 IMDB 数据集,地址: http://ai.stanford.edu/~amaas/data/sentiment/ ,这是一份包含了 5 万条流行电影的评论数据,其中训练集 25000 条,测试集 25000 条。数据格式如下:

下图左边为名称,其中名称包含两部分,分别是序号和情感评分,(1-4 为 neg,5-10 为 pos),右边为评论内容

根据上述的样本,需要使用 pytorch 完成模型,实现对评论情感进行预测

# 2. 思路分析

首先可以把上述问题定义为分类问题,情感评分分为 1-10,10 个类别(也可以理解为回归问题,这里当做分类问题考虑)。那么根据之前的经验,我们的大致流程如下:

  1. 准备数据集
  2. 构建模型
  3. 模型训练
  4. 模型评估

知道思路之后,那么我们一步步来完成上述步骤

# 3. 准备数据集

准备数据集和之前的方法一样,实例化 dataset,准备 dataloader,最终我们的数据可以处理成如下格式:

其中有两点需要注意:

  1. 如何完成基础打 Dataset 的构建和 Dataloader 的准备
  2. 每个 batch 中文本的长度不一致的问题如何解决
  3. 每个 batch 中的文本如何转化为数字序列

# 3.1 基础 Dataset 的准备

import torch
from torch.utils.data import DataLoader,Dataset
import os
import re
data_base_path = r"data\aclImdb"  # 字符串前加 r 表示 raw string,\ 不再表示转义
#1. 定义 tokenize 的方法
def tokenize(text):
    # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
    fileters = ['!','"','#','$','%','&','\(','\)','\*','\+',',','-','\.','/',':',';','<','=','>','\?','@'
        ,'\[','\\','\]','^','_','`','\{','\|','\}','~','\t','\n','\x97','\x96','”','“',]
    text = re.sub("<.*?>"," ",text,flags=re.S)
    text = re.sub("|".join(fileters)," ",text,flags=re.S)
    return [i.strip() for i in text.split()]
#2. 准备 dataset
class ImdbDataset(Dataset):
    def __init__(self,mode):
        super(ImdbDataset,self).__init__()
        if mode=="train":
            text_path = [os.path.join(data_base_path,i)  for i in ["train/neg","train/pos"]]
        else:
            text_path =  [os.path.join(data_base_path,i)  for i in ["test/neg","test/pos"]]
        self.total_file_path_list = []
        for i in text_path:
            self.total_file_path_list.extend([os.path.join(i,j) for j in os.listdir(i)])
    def __getitem__(self, idx):
        cur_path = self.total_file_path_list[idx]
        cur_filename = os.path.basename(cur_path)
        label = int(cur_filename.split("_")[-1].split(".")[0]) -1 #处理标题,获取 label,转化为从 [0-9]
        text = tokenize(open(cur_path).read().strip()) #直接按照空格进行分词
        return label,text
    def __len__(self):
        return len(self.total_file_path_list)
  
 # 2. 实例化,准备 dataloader
dataset = ImdbDataset(mode="train")
dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True)
#3. 观察数据输出结果
for idx,(label,text) in enumerate(dataloader):
    print("idx:",idx)
    print("table:",label)
    print("text:",text)
    break

输出如下:

idx: 0
table: tensor([3, 1])
text: [('I', 'Want'), ('thought', 'a'), ('this', 'great'), ('was', 'recipe'), ('a', 'for'), ('great', 'failure'), ('idea', 'Take'), ('but', 'a'), ('boy', 's'), ('was', 'y'), ('it', 'plot'), ('poorly', 'add'), ('executed', 'in'), ('We', 'some'), ('do', 'weak'), ('get', 'completely'), ('a', 'undeveloped'), ('broad', 'characters'), ('sense', 'and'), ('of', 'than'), ('how', 'throw'), ('complex', 'in'), ('and', 'the'), ('challenging', 'worst'), ('the', 'special'), ('backstage', 'effects'), ('operations', 'a'), ('of', 'horror'), ('a', 'movie'), ('show', 'has'), ('are', 'known'), ('but', 'Let'), ('virtually', 'stew'), ('no', 'for'), ...('show', 'somehow'), ('rather', 'destroy'), ('than', 'every'), ('anything', 'copy'), ('worth', 'of'), ('watching', 'this'), ('for', 'film'), ('its', 'so'), ('own', 'it'), ('merit', 'will')]

现在运行出现报错 RuntimeError: each element in list of batch should be of equal size ,应该是处理每个文件的文本后,得到的单词的数量不一致造成的。

明显,其中的 text 内容出现对应,和想象的不太相似,出现问题的原因在于 Dataloader 中的参数 collate_fn

collate_fn 的默认值为 torch 自定义的 default_collate , collate_fn 的作用就是对每个 batch 进行处理,而默认的 default_collate 处理出错。

解决问题的思路:

手段 1:考虑先把数据转化为数字序列,观察其结果是否符合要求,之前使用 DataLoader 并未出现类似错误

手段 2:考虑自定义一个 collate_fn ,观察结果

这里使用方式 2,自定义一个 collate_fn , 然后观察结果:

def collate_fn(batch):
	#batch 是 list,其中是一个一个元组,每个元组是 dataset 中__getitem__的结果
    batch = list(zip(*batch))  # 将标签和单词列表分别放到一起
    labes = torch.tensor(batch[0],dtype=torch.int32)
    texts = batch[1]
    del batch
    return labes,texts
dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True,collate_fn=collate_fn)
#此时输出正常
for idx,(label,text) in enumerate(dataloader):
    print("idx:",idx)
    print("table:",label)
    print("text:",text)
    break

# 3.2 文本序列化

再介绍 word embedding 的时候,我们说过,不会直接把文本转化为向量,而是先转化为数字,再把数字转化为向量,那么这个过程该如何实现呢?

这里我们可以考虑把文本中的每个词语和其对应的数字,使用字典保存,同时实现方法把句子通过字典映射为包含数字的列表

实现文本序列化之前,考虑以下几点:

  1. 如何使用字典把词语和数字进行对应
  2. 不同的词语出现的次数不尽相同,是否需要对高频或者低频词语进行过滤,以及总的词语数量是否需要进行限制
  3. 得到词典之后,如何把句子转化为数字序列,如何把数字序列转化为句子
  4. 不同句子长度不相同,每个 batch 的句子如何构造成相同的长度(可以对短句子进行填充,填充特殊字符)
  5. 对于新出现的词语在词典中没有出现怎么办(可以使用特殊字符代理)

思路分析:

  1. 对所有句子进行分词
  2. 词语存入字典,根据次数对词语进行过滤,并统计次数
  3. 实现文本转数字序列的方法
  4. 实现数字序列转文本方法
import numpy as np
class Word2Sequence():
    UNK_TAG = "UNK"
    PAD_TAG = "PAD"
    UNK = 0  # 低频词或未在词表中的词
    PAD = 1  # 补全字符
    def __init__(self):
        self.dict = {
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.fited = False
    def to_index(self, word):
        """word -> index"""
        assert self.fited == True, "必须先进行fit操作"
        return self.dict.get(word, self.UNK)
    def to_word(self, index):
        """index -> word"""
        assert self.fited, "必须先进行fit操作"
        if index in self.inversed_dict:
            return self.inversed_dict[index]
        return self.UNK_TAG
    def __len__(self):
        return self(self.dict)
    def fit(self, sentences, min_count=1, max_count=None, max_feature=None):
        """
        :param sentences:[[word1,word2,word3],[word1,word3,wordn..],...]
        :param min_count: 最小出现的次数
        :param max_count: 最大出现的次数
        :param max_feature: 总词语的最大数量
        :return:
        """
        count = {}
        for sentence in sentences:
            for a in sentence:
                if a not in count:
                    count[a] = 0
                count[a] += 1
        # 比最小的数量大和比最大的数量小的需要
        if min_count is not None:
            count = {k: v for k, v in count.items() if v >= min_count}
        if max_count is not None:
            count = {k: v for k, v in count.items() if v <= max_count}
        # 限制最大的数量
        if isinstance(max_feature, int):
            # 按照 value 值进行从小到大排序
            count = sorted(list(count.items()), key=lambda x: x[1])
            print(count)
            # 如果超出,则只取频率高的
            if max_feature is not None and len(count) > max_feature:
                count = count[-int(max_feature):]
            for w, _ in count:
                self.dict[w] = len(self.dict)
        else:
            # 没有超出,不用排序
            for w in sorted(count.keys()):
                self.dict[w] = len(self.dict)
        self.fited = True
        # 准备一个 index->word 的字典
        self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
    def transform(self, sentence, max_len=None):
        """
        实现吧句子转化为数组(向量)
        :param sentence:
        :param max_len:
        :return:
        """
        assert self.fited, "必须先进行fit操作"
        if max_len is not None:
            r = [self.PAD] * max_len
        else:
            r = [self.PAD] * len(sentence)
        # 得到的 r 为相应长度的全为 1 的向量
      
        # 若句子过长,则只取前 max_len 个字符
        if max_len is not None and len(sentence) > max_len:
            sentence = sentence[:max_len]
      
        # 将字符转换为数字
        for index, word in enumerate(sentence):
            r[index] = self.to_index(word)
      
        # 将列表转换为向量
        return np.array(r, dtype=np.int64)
    def inverse_transform(self, indices):
        """
        实现从数组 转化为文字
        :param indices: [1,2,3....]
        :return:[word1,word2.....]
        """
        sentence = []
        for i in indices:
            word = self.to_word(i)
            sentence.append(word)
        return sentence
if __name__ == '__main__':
    w2s = Word2Sequence()
    w2s.fit([
        ["你", "好", "么"],
        ["你", "好", "哦"]])
    print(w2s.dict)
    print(w2s.fited)
    print(w2s.transform(["你", "好", "嘛"]))
    print(w2s.transform(["你好嘛"], max_len=10))

完成了 wordsequence 之后,接下来就是保存现有样本中的数据字典,方便后续的使用。

实现对 IMDB 数据的处理和保存

#1. 对 IMDB 的数据记性 fit 操作
def fit_save_word_sequence():
    from wordSequence import Word2Sequence
    ws = Word2Sequence()
    train_path = [os.path.join(data_base_path,i)  for i in ["train/neg","train/pos"]]
    total_file_path_list = []
    for i in train_path:
        total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
    for cur_path in tqdm(total_file_path_list,ascii=True,desc="fitting"):
        ws.fit(tokenize(open(cur_path).read().strip()))
    ws.build_vocab()
    # 对 wordSequesnce 进行保存
    pickle.dump(ws,open("./model/ws.pkl","wb"))
#2. 在 dataset 中使用 wordsequence
ws = pickle.load(open("./model/ws.pkl","rb"))
def collate_fn(batch):
    MAX_LEN = 500 
    #MAX_LEN = max ([len (i) for i in texts]) #取当前 batch 的最大值作为 batch 的最大长度
    batch = list(zip(*batch))
    labes = torch.tensor(batch[0],dtype=torch.int)
    texts = batch[1]
    #获取每个文本的长度
    lengths = [len(i) if len(i)<MAX_LEN else MAX_LEN for i in texts]
    texts = torch.tensor([ws.transform(i, MAX_LEN) for i in texts])
    del batch
    return labes,texts,lengths
#3. 获取输出
dataset = ImdbDataset(ws,mode="train")
    dataloader = DataLoader(dataset=dataset,batch_size=20,shuffle=True,collate_fn=collate_fn)
    for idx,(label,text,length) in enumerate(dataloader):
        print("idx:",idx)
        print("table:",label)
        print("text:",text)
        print("length:",length)
        break

输出如下

idx: 0
table: tensor([ 7,  4,  3,  8,  1, 10,  7, 10,  7,  2,  1,  8,  1,  2,  2,  4,  7, 10,
         1,  4], dtype=torch.int32)
text: tensor([[ 50983,  77480,  82366,  ...,      1,      1,      1],
        [ 54702,  57262, 102035,  ...,  80474,  56457,  63180],
        [ 26991,  57693,  88450,  ...,      1,      1,      1],
        ...,
        [ 51138,  73263,  80428,  ...,      1,      1,      1],
        [  7022,  78114,  83498,  ...,      1,      1,      1],
        [  5353, 101803,  99148,  ...,      1,      1,      1]])
length: [296, 500, 221, 132, 74, 407, 500, 130, 54, 217, 80, 322, 72, 156, 94, 270, 317, 117, 200, 379]

思考:前面我们自定义了 MAX_LEN 作为句子的最大长度,如果我们需要把每个 batch 中的最长的句子长度作为当前 batch 的最大长度,该如何实现?

# 4. 构建模型

这里我们只练习使用 word embedding,所以模型只有一层,即:

  1. 数据经过 word embedding
  2. 数据通过全连接层返回结果,计算 log_softmax
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from build_dataset import get_dataloader,ws,MAX_LEN
class IMDBModel(nn.Module):
    def __init__(self,max_len):
        super(IMDBModel,self).__init__()
        self.embedding = nn.Embedding(len(ws),300,padding_idx=ws.PAD) #[N,300]
        self.fc = nn.Linear(max_len*300,10)  #[max_len*300,10]
    def forward(self, x):
        embed = self.embedding(x) #[batch_size,max_len,300]
        embed = embed.view(x.size(0),-1)
        out = self.fc(embed)
        return F.log_softmax(out,dim=-1)

# 5. 模型的训练和评估

训练流程和之前相同

  1. 实例化模型,损失函数,优化器
  2. 遍历 dataset_loader,梯度置为 0,进行向前计算
  3. 计算损失,反向传播优化损失,更新参数
train_batch_size = 128
test_batch_size = 1000
imdb_model = IMDBModel(MAX_LEN)
optimizer = optim.Adam(imdb_model.parameters())
criterion = nn.CrossEntropyLoss()
def train(epoch):
    mode = True
    imdb_model.train(mode)
    train_dataloader =get_dataloader(mode,train_batch_size)
    for idx,(target,input,input_lenght) in enumerate(train_dataloader):
        optimizer.zero_grad()
        output = imdb_model(input)
        loss = F.nll_loss(output,target) #traget 需要是 [0,9],不能是 [1-10]
        loss.backward()
        optimizer.step()
        if idx %10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, idx * len(input), len(train_dataloader.dataset),
                       100. * idx / len(train_dataloader), loss.item()))
            torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
            torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
          
 def test():
    test_loss = 0
    correct = 0
    mode = False
    imdb_model.eval()
    test_dataloader = get_dataloader(mode, test_batch_size)
    with torch.no_grad():
        for target, input, input_lenght in test_dataloader:
            output = imdb_model(input)
            test_loss  += F.nll_loss(output, target,reduction="sum")
            pred = torch.max(output,dim=-1,keepdim=False)[-1]
            correct = pred.eq(target.data).sum()
        test_loss = test_loss/len(test_dataloader.dataset)
        print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_dataloader.dataset),
            100. * correct / len(test_dataloader.dataset)))
if __name__ == '__main__':
    test()
    for i in range(3):
        train(i)
        test()

这里我们仅仅使用了一层全连接层,其分类效果不会很好,这里重点是理解常见的模型流程和 word embedding 的使用方法