Transformer 一起动手编码学原理 - 阿里技术

学习Transformer,快来跟着作者动手写一个。

作为工程同学,学习Transformer中,不动手写一个,总感觉理解不扎实。纸上得来终觉浅,绝知此事要躬行,抽空多debug几遍!

注:不涉及算法解释,仅是从工程代码实现去加强理解。

一、准备知识

以预测二手房价格的模型,先来理解机器学习中的核心概念。

1.1、手撸版

1、准备训练数据

# 样本数 
num_examples = 1000 
# 特征数:房屋大小、新旧 
num_input = 2 
# 线性回归模型,2个W参数值:2,-3.4 
true_w = [2, -3.4] 
# 1个b参数值 
true_b = 4.2 
# 随机生成样本:特征值 
features = torch.randn(num_examples, num_input, dtype=torch.float32) 
# print(features[0])  
# 随机生成样本:y值 
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b 
# print(labels[0]) 
# 随机生成一批数据,给y值再随机化下 
_temp = torch.tensor(np.random.normal(0, 0.01, size=labels.size()), dtype=torch.float32) 
labels = labels + _temp 
# print(labels[0])

2、定义模型

# 模型参数 w:w1、w2,b:b,设了初始值 
w = torch.tensor(np.random.normal(0, 0.01, (num_input, 1)), dtype=torch.float32) 
b = torch.zeros(1, dtype=torch.float32) 
# 设置要求计算梯度 
w.requires_grad_(requires_grad=True) 
b.requires_grad_(requires_grad=True) 
# 线性回归模型:定义 torch.mm(X, w) + b,mm是矩阵乘法(m:multiply缩写) 
def linreg(X, w, b): 
    return torch.mm(X, w) + b 
# 定义损失函数:y_pred=预测值、y=真实值,最简单的 平方误差 
def squared_loss(y_pred, y): 
    return (y_pred - y.view(y_pred.size())) ** 2 / 2 
# 定义梯度下降算法:sgd 
# params是参数(data 是参数值、grad 是梯度值),lr是学习率,batch是数量 
# 为什么要除batch?因为这个梯度值,是在batch个数据上累加算的 
def sgd(params, lr, batch): 
    for param in params: 
        # 注意这里更改param时用的param.data 
        param.data -= lr * param.grad / batch

3、训练模型

# 初始学习率 
lr = 0.03 
# 训练轮次 
epoch = 5 
# 网络:一层,2个输入(2个参数 w1、w2),1个b参数 
net = linreg 
# 损失函数 
loss = squared_loss 
# 1个批次大小 
batch_size = 10 
# 训练模型一共需要epoch个迭代周期 
for epoch in range(epoch): 
    # 在每一个迭代周期中,所有样本都要跑,从 0~1000,每个batch=10 
    for X, y in data_iter(batch_size, features, labels): 
        # ls是这个batch的损失求和 
        ls = loss(net(X, w, b), y).sum() 
        # 求梯度值 
        ls.backward() 
        # 更新参数 
        sgd([w, b], lr, batch_size) 
        # 不要忘了梯度清零,下一次要用 
        w.grad.data.zero_() 
        b.grad.data.zero_() 
    # 在计算这轮训练完后,打印 loss,可以观察在不断变小 
    train_l = loss(net(features, w, b), labels) 
    print('epoch %d, loss %f' % (epoch + 1, train_l.mean().item()))

辅助函数:by batch & 随机,读取样本数据

# 定义随机读取数据 by batch 
def data_iter(batch, features, labels): 
nums = len(features) 
# 生成 0-1000 数组 
indices = list(range(nums)) 
# 打乱下标的读取顺序 
random.shuffle(indices) 
# 生成:start=0,stop=1000,step=batch=10 
for i in range(0, nums, batch): 
# 截取一段下标,size=batch 
t_ind = indices[i: min(i + batch, num_examples)]  # 最后一次可能不足一个batch 
# 转换为torch要求格式:tensor 
j = torch.LongTensor(t_ind) 
# 按照下标从向量中找 
# yield:生成一个迭代器返回,有点类似闭包,每调用一次返回一次,且下次调用时会从上次暂停的位置继续 
yield features.index_select(0, j), labels.index_select(0, j)

1.2、pytorch版

1、准备训练数据

同上

2、定义模型

# 定义模型:线性回归 
class LinearNet(nn.Module): 
    def __init__(self, n_feature): 
        super(LinearNet, self).__init__() 
        # 线性规划:n_feature=几个w,1=1个b 
        self.linear = nn.Linear(n_feature, 1) 
    # forward 定义【前向传播】 
    def forward(self, x): 
        y = self.linear(x) 
        return y 
# 实例化模型,初始化参数值 
net = LinearNet(num_input) 
init.normal_(net[0].weight, mean=0, std=0.01) 
init.constant_(net[0].bias, val=0) 
# 定义损失函数 
loss = nn.MSELoss() 
# 定义梯度下降算法(lr是学习率) 
optimizer = optim.SGD(net.parameters(), lr=0.03)

3、训练模型

num_epochs = 3 
for epoch in range(1, num_epochs + 1): 
    for X, y in data_iter: 
        output = net(X) 
        # 算损失值 
        l_sum = loss(output, y.view(-1, 1)) 
        # 更新梯度 
        l_sum.backward() 
        # 更新w和b 
        optimizer.step() 
        # 梯度清零 
        optimizer.zero_grad() 
    print('epoch %d, loss: %f' % (epoch, l_sum.item()))

batch 读取数据

# 读取数据 by batch & 随机 
batch_size = 10 
data_set = Data.TensorDataset(features, labels) 
data_iter = Data.DataLoader(data_set, batch_size, shuffle=True)

在用torch框架,进一步要理解:

a、nn.Module 和 forward() 函数:表示神经网络中的一个层,覆写 init 和 forward 方法(提问:实现2层网络怎么做?)

b、debug 观察下数据变化,和 手撸版 对比着去理解

二、手写Transformer

上面这是一个"逻辑"示意图。印象中,第1次看时,以为 "inputs & outputs" 是并行计算、但上面又有依赖,很糊涂。

这不是一个技术架构图,上图有很多概念,attention、multi-head等。先尝试转换为面向对象的类图,如下:



2.1、例子:翻译

# 定义词典 
source_vocab = {'E': 0, '我': 1, '吃': 2, '肉': 3} 
target_vocab = {'E': 0, 'I': 1, 'eat': 2, 'meat': 3, 'S': 4} 
# 样本数据 
encoder_input = torch.LongTensor([[1, 2, 3, 0]]).to(device)  # 我 吃 肉 E, E代表结束词 
decoder_input = torch.LongTensor([[4, 1, 2, 3]]).to(device)  # S I eat meat, S代表开始词, 并右移一位,用于并行训练 
target = torch.LongTensor([[1, 2, 3, 0]]).to(device)  # I eat meat E, 翻译目标

2.2、定义模型

按照上面类图,我们来一点点实现

1、Attention

class ScaledDotProductAttention(nn.Module): 
    def __init__(self): 
        super(ScaledDotProductAttention, self).__init__() 
    def forward(self, Q, K, V, attn_mask): 
        # Q、K、V,此时是已经乘过 W(q)、W(k)、W(v) 矩阵 
        # 如下图,但不用一个个算,矩阵乘法一次搞定 
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) 
        # 遮盖区的值设为近0,表示E结尾 or decoder 自我顺序遮盖,注意力丢弃 
        scores.masked_fill_(attn_mask, -1e9) 
        # softmax后(遮盖区变为0) 
        attn = nn.Softmax(dim=-1)(scores) 
        # 乘积意义:给V带上了注意力信息。prob就是下图z(矩阵计算不用在v1+v2)。 
        prob = torch.matmul(attn, V) 
        return prob



2、MultiHeadAttention

通用变量定义

d_model = 6  # embedding size 
# d_model = 3  # embedding size 
d_ff = 12  # feedforward nerual network  dimension 
d_k = d_v = 3  # dimension of k(same as q) and v 
n_heads = 2  # number of heads in multihead attention 
# n_heads = 1  # number of heads in multihead attention【注:为debug更简单,可以先改为1个head】 
p_drop = 0.1  # propability of dropout 
device = "cpu"

注1:按惯性会想,会有多个head、串行循环计算,不是,多个head是一个张量输入

注2:FF 全连接、残差连接、归一化,35、38 行业代码,pytorch框架带来的简化

class MultiHeadAttention(nn.Module): 
def __init__(self): 
super(MultiHeadAttention, self).__init__() 
self.n_heads = n_heads 
self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False) 
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False) 
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False) 
self.fc = nn.Linear(d_v * n_heads, d_model, bias=False)  # ff 全连接 
self.layer_norm = nn.LayerNorm(d_model)  # normal 归一化 
def forward(self, input_Q, input_K, input_V, attn_mask): 
# input_Q:1*4*6,每批1句 * 每句4个词 * 每词6长度编码 
# residual 先临时保存下:原始值,后面做残差连接加法 
residual, batch = input_Q, input_Q.size(0) 
# 乘上 W 矩阵。注:W 就是要训练的参数 
# 注意:维度从2维变成3维,增加 head 维度,也是一次性并行计算 
Q = self.W_Q(input_Q)  # 乘以 W(6*6) 变为 1*4*6 
Q = Q.view(batch, -1, n_heads, d_k).transpose(1, 2)  # 切开为2个Head 变为 1*2*4*3 1批 2个Head 4词 3编码 
K = self.W_K(input_K).view(batch, -1, n_heads, d_k).transpose(1, 2) 
V = self.W_V(input_V).view(batch, -1, n_heads, d_v).transpose(1, 2) 
# 1*2*4*4,2个Head的4*4,最后一列为true 
# 因为最后一列是 E 结束符 
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) 
# 返回1*2*4*3,2个头,4*3为带上关注关系的4词 
prob = ScaledDotProductAttention()(Q, K, V, attn_mask) 
# 把2头重新拼接起来,变为 1*4*6 
prob = prob.transpose(1, 2).contiguous() 
prob = prob.view(batch, -1, n_heads * d_v).contiguous() 
# 全连接层:对多头注意力的输出进行线性变换,从而更好地提取信息 
output = self.fc(prob) 
# 残差连接 & 归一化 
res = self.layer_norm(residual + output) # return 1*4*6 
return res

3、Encoder

在 attention 概念中,有很关键的 "遮盖" 概念,先不细究,你debug一遍会更理解

def get_attn_pad_mask(seq_q, seq_k):  # 本质是结尾E做注意力遮盖,返回 1*4*4,最后一列为True 
batch, len_q = seq_q.size()  # 1, 4 
batch, len_k = seq_k.size()  # 1, 4 
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # 为0则为true,变为f,f,f,true,意思是把0这个结尾标志为true 
return pad_attn_mask.expand(batch, len_q, len_k)  # 扩展为1*4*4,最后一列为true,表示抹掉结尾对应的注意力 
def get_attn_subsequent_mask(seq):  # decoder的自我顺序注意力遮盖,右上三角形区为true的遮盖 
attn_shape = [seq.size(0), seq.size(1), seq.size(1)] 
subsequent_mask = np.triu(np.ones(attn_shape), k=1) 
subsequent_mask = torch.from_numpy(subsequent_mask) 
return subsequent_mask
class Encoder(nn.Module): 
def __init__(self): 
super(Encoder, self).__init__() 
self.source_embedding = nn.Embedding(len(source_vocab), d_model) 
self.attention = MultiHeadAttention() 
def forward(self, encoder_input): 
# input 1 * 4,1句话4个单词 
# 1 * 4 * 6,将每个单词的整数字编码扩展到6个浮点数编码 
embedded = self.source_embedding(encoder_input) 
# 1 * 4 * 4 矩阵,最后一列为true,表示忽略结尾词的注意力机制 
mask = get_attn_pad_mask(encoder_input, encoder_input) 
# 1*4*6,带上关注力的4个词矩阵 
encoder_output = self.attention(embedded, embedded, embedded, mask) 
return encoder_output

4、Decoder

class Decoder(nn.Module): 
def __init__(self): 
super(Decoder, self).__init__() 
self.target_embedding = nn.Embedding(len(target_vocab), d_model) 
self.attention = MultiHeadAttention() 
# 三入参形状分别为 1*4, 1*4, 1*4*6,前两者未被embedding,注意后面这个是 encoder_output 
def forward(self, decoder_input, encoder_input, encoder_output): 
# 编码为1*4*6 
decoder_embedded = self.target_embedding(decoder_input) 
# 1*4*4 全为false,表示没有结尾词 
decoder_self_attn_mask = get_attn_pad_mask(decoder_input, decoder_input) 
# 1*4*4 右上三角区为1,其余为0 
decoder_subsequent_mask = get_attn_subsequent_mask(decoder_input) 
# 1*4*4 右上三角区为true,其余为false 
decoder_self_mask = torch.gt(decoder_self_attn_mask + decoder_subsequent_mask, 0) 
# 1*4*6 带上注意力的4词矩阵【注:decoder里面,第1个attention】 
decoder_output = self.attention(decoder_embedded, decoder_embedded, decoder_embedded, decoder_self_mask) 
# 1*4*4 最后一列为true,表示E结尾词 
decoder_encoder_attn_mask = get_attn_pad_mask(decoder_input, encoder_input) 
# 输入均为 1*4*6,Q表示"S I eat meat"、K表示"我吃肉E"、V表示 "我吃肉E" 
#【注:decoder里面,第2个attention】 
decoder_output = self.attention(decoder_output, encoder_output, encoder_output, decoder_encoder_attn_mask) 
return decoder_output

5、Transformer

class Transformer(nn.Module): 
    def __init__(self): 
        super(Transformer, self).__init__() 
        self.encoder = Encoder() 
        self.decoder = Decoder() 
        self.fc = nn.Linear(d_model, len(target_vocab), bias=False) 
    def forward(self, encoder_input, decoder_input): 
        # 入 1*4,出 1*4*6,作用:"我吃肉E",并带上三词间的关注力信息 
        encoder_output = self.encoder(encoder_input) 
        # 入 1*4, 1*4, 1*4*6=encoder_output 
        decoder_output = self.decoder(decoder_input, encoder_input, encoder_output) 
        # 预测出4个词,每个词对应到词典中5个词的概率,如下 
        # tensor([[[ 0.0755, -0.2646,  0.1279, -0.3735, -0.2351],[-1.2789,  0.6237, -0.6452,  1.1632,  0.6479]]] 
        decoder_logits = self.fc(decoder_output) 
        res = decoder_logits.view(-1, decoder_logits.size(-1)) 
        return res

2.3、训练模型

model = Transformer().to(device) 
criterion = nn.CrossEntropyLoss() 
optimizer = optim.Adam(model.parameters(), lr=1e-1) 
for epoch in range(10): 
    # 输出4*5,代表预测出4个词,每个词对应到词典中5个词的概率 
    output = model(encoder_input, decoder_input)   
    # 和目标词 I eat meat E做差异计算 
    loss = criterion(output, target.view(-1))   
    print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss)) 
    # 这个3个操作:清零梯度、算法梯度、更新参数 
    optimizer.zero_grad() 
    loss.backward() 
    optimizer.step()

2.4、使用模型

# 预测目标是5个单词 
target_len = len(target_vocab)   
# 1*4*6 输入"我吃肉E",先算【自注意力】 
encoder_output = model.encoder(encoder_input)   
# 1*5 全是0,表示EEEEE 
decoder_input = torch.zeros(1, target_len).type_as(encoder_input.data)   
# 表示S开始字符 
next_symbol = 4   
# 5个单词逐个预测【注意:是一个个追加词,不断往后预测的】 
for i in range(target_len):   
    # 譬如i=0第一轮,decoder输入为SEEEE,第二轮为S I EEE,把预测 I 给拼上去,继续循环 
    decoder_input[0][i] = next_symbol   
    # decoder 输出 
    decoder_output = model.decoder(decoder_input, encoder_input, encoder_output) 
    # 负责将解码器的输出映射到目标词汇表,每个元素表示对应目标词汇的分数 
    # 取出最大的五个词的下标,譬如[1, 3, 3, 3, 3] 表示 i,meat,meat,meat,meat 
    logits = model.fc(decoder_output).squeeze(0) 
    prob = logits.max(dim=1, keepdim=False)[1] 
    next_symbol = prob.data[i].item()  # 只取当前i 
    for k, v in target_vocab.items(): 
        if v == next_symbol: 
            print('第', i, '轮:', k) 
            break 
    if next_symbol == 0:  # 遇到结尾了,那就完成翻译 
        break

参考资料:

1、https://jalammar.github.io/illustrated-transformer/
2、http://nlp.seas.harvard.edu/annotated-transformer/
3、gpt4:学习过程中有很多疑惑,真是一个好老师

1