简介

在自然语言处理(NLP)领域,Transformer 是一种深度学习模型架构,最早由 Vaswani 等人于 2017 年提出,并且迅速成为了现代 NLP 任务中最为重要和广泛应用的模型架构之一。Transformer 模型的核心优势在于它能够有效地并行处理序列数据,解决了传统的 RNN 和 LSTM 模型中的长期依赖问题,并显著提升了训练效率和性能。

Transformer 的背景

在 Transformer 出现之前,循环神经网络(RNN)和长短时记忆网络(LSTM)在 NLP 任务中占据主导地位。然而,它们存在以下问题:

  • 长距离依赖问题:RNN 和 LSTM 在处理长序列时,信息的传递会随着时间的推移逐渐消失,导致难以捕捉长距离依赖。

  • 并行化困难:RNN 和 LSTM 是按时间步逐个处理数据的,这使得训练过程不容易并行化。

Transformer 提供了一种新的思路,通过自注意力机制(self-attention)来捕捉输入序列中不同位置之间的关系,同时可以并行处理整个序列。

模型架构

1. 自注意力机制(Self-Attention)

自注意力是 Transformer 的核心,旨在通过学习输入序列中不同部分之间的关系来生成更丰富的表示。自注意力机制允许模型在处理每个单词时,查看输入序列中其他所有单词,以决定哪个部分的上下文对当前单词最重要。

计算过程:

对于给定的输入向量,模型通过以下步骤计算自注意力:

  • 计算 查询(Query)、(Key)和 (Value)矩阵,它们是通过与输入向量进行线性变换获得的。

  • 然后计算查询与键的相似度(通常通过点积)并通过 Softmax 归一化,得到注意力权重。

  • 最后,根据注意力权重加权值矩阵,从而得到每个词的加权表示。

公式:

  • Attention(Q, K, V) = Softmax(QK^T / √d_k) * V

    • Q 是查询(Query)矩阵。

    • K 是键(Key)矩阵。

    • V 是值(Value)矩阵。

    • d_k 是键向量的维度,用于缩放。

2. 多头注意力机制(Multi-Head Attention)

为了让模型能够从多个子空间中学习不同的表示,Transformer 使用了多头注意力机制。具体来说,模型将查询、键和值映射到多个子空间,并且独立计算每个子空间的注意力,最终将它们的结果进行拼接和线性变换。

3. 位置编码(Positional Encoding)

由于 Transformer 模型不依赖于递归结构,它没有天然的处理序列顺序的能力。因此,必须显式地将位置编码添加到输入中,以便模型能够理解单词在序列中的相对或绝对位置。位置编码通常采用正弦和余弦函数来生成。

4. 前馈神经网络(Feed-Forward Network)

在每个 Transformer 层中,除了多头注意力之外,还包含一个全连接的前馈神经网络。这个网络对每个位置的输出进行独立的非线性变换。

5. 层归一化和残差连接(Layer Normalization and Residual Connections)

Transformer 使用了层归一化来稳定训练,并且在每个子层(如自注意力、前馈网络)中加入了残差连接,帮助避免梯度消失问题,并加速训练过程。

6、编码器和解码器

两个部分都由多个相同的层堆叠而成

编码器(Encoder):

每个编码器层包含两个主要组件:

  • 多头自注意力机制

  • 前馈神经网络

每个组件之后都有残差连接和层归一化。

解码器(Decoder):

解码器的每一层包含三个主要组件:

  1. Masked Multi-Head Self-Attention:这是自注意力的变种,确保解码器只能关注已经生成的部分,而不能看到未来的部分。

  2. Multi-Head Attention:这是标准的多头注意力,允许解码器通过编码器的输出与其自身的输入进行交互。

  3. 前馈神经网络:与编码器相同。

结构图

Input Embedding
     |
    +-------------------------+
    |      Encoder            |  (多个编码器层)
    +-------------------------+
     |
    +-------------------------+
    |      Decoder            |  (多个解码器层)
    +-------------------------+
     |
   Output

Transformer 的变种和应用

随着时间的推移,基于 Transformer 架构的模型被广泛地扩展和应用,主要包括以下几种变种:

  1. BERT (Bidirectional Encoder Representations from Transformers)

    • BERT 是一种预训练的语言表示模型,专注于编码器部分,并通过双向训练(即同时从左到右和从右到左)来捕获上下文信息。它在很多 NLP 任务中都表现出色。

  2. GPT (Generative Pre-trained Transformer)

    • GPT 是一种基于 Transformer 的生成模型,专注于解码器部分,利用单向(从左到右)训练方式进行文本生成。

  3. T5 (Text-to-Text Transfer Transformer)

    • T5 将所有 NLP 任务转化为文本生成任务,因此它可以同时处理多种类型的任务,包括翻译、问答、摘要等。

  4. Transformer-XL (Transformer with Extra Long Context)

    • Transformer-XL 通过引入相对位置编码和记忆机制,克服了 Transformer 在处理长序列时的限制,能够处理比传统 Transformer 更长的上下文。

  5. XLNet

    • XLNet 是结合了自回归模型和自编码模型的 Transformer 变种,突破了 BERT 的局限性,能够捕获更全面的上下文信息。

代码示例

前期准备

首先我们需要安装基本库,如transformersdatasets,需要注意的是,数据清洗所使用的pandas、numpy本文中暂不过多赘述。仅做针对transformers的一些代码样例

pip install transformers datasets torch 
#主要核心库

模型训练

#导入库
import json
import os
#减少显存压力
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
os.environ["TOKENIZERS_PARALLELISM"] = "false"
from dataclasses import dataclass, field
from typing import Optional
from transformers import AutoTokenizer, T5ForConditionalGeneration, TrainerCallback
from transformers import HfArgumentParser, TrainingArguments, Trainer, set_seed
from datasets import load_dataset, Dataset
from sklearn.metrics import accuracy_score
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import torch
import global_value
import CustomCallback
import gc

def train(train_path, output_dir, model_name_or_path, max_len, batch_size, gradient_accumulation_steps, learning_rate, warmup_steps, eval_steps, save_steps,
          num_epochs, max_steps, do_train, do_eval, fp16, weight_decay):
#开始训练前清理缓存,开启expandable_segments,减少显存占用
    torch.cuda.empty_cache()
    os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
#设置基本训练参数
    args_dict = {
        "model_name_or_path": model_name_or_path,   #模型路径
        "max_len": max_len,   #输入最大长度
        "output_dir": output_dir,   #输出地址
        "overwrite_output_dir": True,   #是否进行覆盖,如果选择true,则多次训练结果会覆盖
        "per_device_train_batch_size": batch_size,   #训练批次
        "per_device_eval_batch_size": batch_size,   #评估批次
        "gradient_accumulation_steps": gradient_accumulation_steps,   #训练梯度累积
        "eval_accumulation_steps":4,   #评估梯度累积
        "learning_rate": learning_rate,   #初始学习率
        "lr_scheduler_type": "linear",   #线性学习率调度器
        "warmup_steps": warmup_steps,  #学习率预热
        "logging_steps": 100,   #日志保存步数
        "eval_strategy": "steps",   #验证标准,如选择steps,则eval_steps设置生效,如选择epoch,则每个轮次后自动评估
        "save_total_limit": 3,   #最大保存的检查点数量
        "eval_steps": eval_steps,   #评估步数
        "save_steps": save_steps,   #保存步数
        "num_train_epochs": num_epochs,   #训练总轮次,与max_steps互斥,如果想要生效,需要选择max_steps为-1
        "do_train": do_train,
        "do_eval": do_eval,
        "fp16": fp16,   #启动半精度计算,减少显存压力用的,全精度是32
        "max_steps":max_steps,   #设置为-1则默认启动epochs的设置,如果设置了2000这种具体值,则按照steps训练
        "load_best_model_at_end":True,   #保存最好的模型,需要搭配metric_for_best_model、greater_is_better使用
        "metric_for_best_model":"eval_loss",   #设置评估标准,eval_loss指的是按照验证损失来判断最好的是哪个
        "greater_is_better": False,   #false指的是这个值越小越好,跟load_best_model_at_end、greater_is_better绑定
        "weight_decay":weight_decay,  #权重衰减,也叫权重惩罚,防止过拟合的
        # "logging_dir": "./logs",  # 设置 TensorBoard 日志目录
    }

    # 解析参数
    parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
    model_args, data_args, training_args = parser.parse_dict(args_dict)

    # 设置随机种子
    set_seed(training_args.seed)

    # 加载数据集
    dataset = load_dataset('text', data_files={'train': train_path})
    full_train_dataset = dataset['train']
    # 转换为 DataFrame
    df = pd.DataFrame(full_train_dataset)

    # 从训练集中裁剪出 20% 作为测试集
    train_df, valid_df = train_test_split(df, test_size=0.15, random_state=training_args.seed)

    # 将 DataFrame 转换回 Dataset
    train_dataset = Dataset.from_pandas(train_df)
    valid_dataset = Dataset.from_pandas(valid_df)

    # 加载模型和分词器,这边以T5模型为例,也可以选择bert等等,如果选择bert模型加载方法需要变更BertForMaskedLM、BertTokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_args.model_name_or_path, max_length=128)
    model = T5ForConditionalGeneration.from_pretrained(model_args.model_name_or_path)

    # 设置最大长度
    tokenizer.model_max_length = data_args.max_len
    model.config.max_length = data_args.max_len

    # 数据集标记化,这里以tsv格式的分类问题进行示例
    def tokenize_dataset(dataset):
        def convert_to_features(example_batch):
            src_texts, trg_texts = [], []
            for example in example_batch['text']:
                if isinstance(example, str) and '\t' in example:
                
                    terms = example.split('\t', 1)
                    src_texts.append(terms[0])
                    trg_texts.append(terms[1])
                else:
                    # 如果格式不对,可能返回默认值
                    src_texts.append(example)
                    trg_texts.append('')  # 或者使用其他的默认值
            input_encodings = tokenizer.batch_encode_plus(src_texts, truncation=True, padding='max_length',
                                                          max_length=data_args.max_len)
            target_encodings = tokenizer.batch_encode_plus(trg_texts, truncation=True, padding='max_length',
                                                           max_length=data_args.max_len)
            return {
                'input_ids': input_encodings['input_ids'],
                'attention_mask': input_encodings['attention_mask'],
                'labels': target_encodings['input_ids'],
                'decoder_attention_mask': target_encodings['attention_mask']
            }

        dataset = dataset.map(convert_to_features, batched=True)
        return dataset

    train_dataset = tokenize_dataset(train_dataset)
    valid_dataset = tokenize_dataset(valid_dataset)

    custom_callback = CustomCallback.CustomCallback()

#调用训练函数,其中包含自定义的callbacks函数,可以省略,或者自定义
    trainer = Trainer(model=model, args=training_args, train_dataset=train_dataset,
                    callbacks=[custom_callback, TerminateOnFlagCallback()], eval_dataset=valid_dataset)

    trainer.train()
    # 评估
    eval_loss = trainer.evaluate(valid_dataset)['eval_loss']
    # 打印最终评估损失
    print(f"Final Validation Loss: {eval_loss:.6f}")
    torch.cuda.empty_cache()
#手动调用gc垃圾回收,也是为了尽可能减少显存压力
    gc.collect()

    trainer.save_model()
    tokenizer.save_pretrained(training_args.output_dir)

模型评估

这里仅提供配合的评估代码,具体评估讲解请参考~

# 评估函数
def eval_model(input_file: str, correct_fn, verbose=True, **kwargs):
    """
    评估模型的性能,计算准确率、精确率、召回率、F1分数
    :param input_file: 输入的验证集文件路径,格式为 TSV,每行是一个源句子和目标句子的对
    :param correct_fn: 纠错函数(输入源句子,输出纠正后的句子)
    :return: dict, 包含 accuracy, precision, recall, f1 等评估指标
    """
    start_time = time.time()



# 读取文件的前1000个字节来检测编码
    with open(input_file, 'rb') as f:
        raw_data = f.read(1000)
        result = chardet.detect(raw_data)
        encoding = result['encoding']

# 使用检测到的编码来读取文件
    df = pd.read_csv(input_file, sep='\t', header=None, names=["src", "tgt"], encoding=encoding)


    # # 读取输入文件
    # df = pd.read_csv(input_file, sep='\t', header=None, names=["src", "tgt"])

    # 获取句子对
    srcs = df["src"].tolist()
    tgts = df["tgt"].tolist()

    srcs = replace_punctuation_in_list(srcs)
    tgts = replace_punctuation_in_list(tgts)
    

    print(f"Total samples: {len(srcs)}")

    # 使用模型进行预测(这个需要自己写一个方法,输出结果)
    predictions = correct_fn(srcs, **kwargs)

    TP = FP = FN = TN = 0
    total_num = len(srcs)

    # 逐个句子进行评估
    for src, tgt, pred in zip(srcs, tgts, predictions):
        if isinstance(pred, dict):
            tgt_pred = pred['target']
            pred_detail = pred['errors']
        else:
            tgt_pred = pred
            pred_detail = ""

        if verbose:
            print()
            print(f"Input: {src}")
            print(f"Truth: {tgt}")
            print(f"Prediction: {tgt_pred}")
            print(f"Prediction Details: {pred_detail}")

        # 负样本:原句等于目标句
        if src == tgt:
            # 预测正确(负样本预测为负)
            if tgt == tgt_pred:
                TN += 1
                print('Correct prediction (True Negative)')
            # 预测错误(负样本预测为正)
            else:
                FP += 1
                print('Wrong prediction (False Positive)')
        # 正样本:原句与目标句不同
        else:
            # 预测正确(正样本预测为正)
            if tgt == tgt_pred:
                TP += 1
                print('Correct prediction (True Positive)')
            # 预测错误(正样本预测为负)
            else:
                FN += 1
                print('Wrong prediction (False Negative)')

        total_num += 1

    # 计算评估指标
    acc = (TP + TN) / total_num
    precision = TP / (TP + FP) if TP + FP > 0 else 0.0
    recall = TP / (TP + FN) if TP + FN > 0 else 0.0
    f1 = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0

    # 输出评估结果
    spend_time = time.time() - start_time
    print(f"\nEvaluation results:")
    print(f"Accuracy: {acc:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1: {f1:.4f}")
    print(f"Total samples: {total_num}")
    print(f"Time spent: {spend_time:.2f} seconds")

    # 返回评估结果
    eval_results = {
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'total_samples': total_num,
        'time_spent': spend_time
    }

    return eval_results

模型调用预测

调用示例,还是以T5的文本纠错也就是分类任务为例

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained("models/models--shibing624--mengzi-t5-base-chinese-correction")
model = T5ForConditionalGeneration.from_pretrained("models/models--shibing624--mengzi-t5-base-chinese-correction").to(device)
    # Split the long text into manageable segments
text = "他等了二十分钟,公车才来,而且今天的路上的车子必较多,公车碰到塞车了。"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)
print("Input IDs:", input_ids)

output = model.generate(
    input_ids,
    max_length=128,       
    num_beams=5,          
    early_stopping=True,
    no_repeat_ngram_size=2,
    do_sample=True,
    temperature=0.7)       

print("Output IDs:", output)

predicted_text = tokenizer.decode(output[0], skip_special_tokens=True)
print("Predicted Text:", predicted_text)