Transformer Code Explained (Pytorch Edition)

Time:2024-4-12

preamble

Based on the previous postClassical Network Architecture Learning-TransformerThe study, today we use pytorch to build their own transformer model, to deepen the understanding of transformer, not only in the field of NLP can not get around the transformer, but also in the field of CV is also very hot, a lot of models are used in the attention mechanism.

Transformer full code

Installed pytorch development environment, you can run directly. You can also directly use the cpu to run my following transformer code, the dataset is relatively small, in 2G memory is enough.

# ======================================
# === Pytorch Handwritten Transformer Full Code
# ======================================
"""
code by Tae Hwan Jung(Jeff Jung) @graykode, Derek Miller @dmmiller612, modify by shwei
Reference: https://github.com/jadore801120/attention-is-all-you-need-pytorch
https://github.com/JayParks/transformer
"""
# ====================================================================================================
# Data construction
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
#device = 'cpu'
device = 'cuda'
# transformer epochs
epochs = 100
# epochs = 1000
# Instead of using some large dataset here, I manually entered two pairs of Chinese → English sentences
# And the indexes for each word were manually hard-coded by me, mainly to make the code less difficult to read #
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# Training set
sentences = [
# The number of words in Chinese and English are not required to be the same.
# enc_input                dec_input           dec_output
P','S I have a good friend.', 'I have a good friend. E'], 's I have a good friend.',
P','S I have zero girl friend.', 'I have zero girl friend. E'], [' s I have zero girl friend.', 'I have zero girl friend. E'],
P','S I have a boy friend.', 'I have a boy friend. E'] P', 's I have a boy friend.', 'I have a boy friend. E']
]
# test set (what the transformer is expected to achieve)
# Input: "I have a girlfriend" #
# Output: "i have a girlfriend"
# Separate thesaurus for Chinese and English words
# Padding Should be Zero
src_vocab = {'P': 0, 'I ': 1,' have ': 2, 'one ': 3,
'a', 4, 'good', 5, 'friends', 6,' friends' : 7, 'zero' : 8, 'female' : 9, 10} 'male' :
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,
'friend': 5, 'zero': 6, 'girl': 7,  'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 8 # (length of source sentence) enc_input max sequence length
tgt_len = 7  # dec_input(=dec_output) max sequence length
# Transformer Parameters
d_model = 512 # Embedding Size (dimensions of token embedding and position encoding)
# FeedForward dimension (hidden layer in two linear layers 512->2048->512, the linear layer is used for feature extraction), and of course another projection layer will be picked up at the end
d_ff = 2048
d_k = d_v = 64 # dimension of K(=Q), V (Q and K need to have the same dimension, here let K=V for convenience)
n_layers = 6 # number of Encoder of Decoder Layer (number of blocks)
n_heads = 8 # number of heads in Multi-Head Attention N_heads = 8 # number of heads in multi-head attention
# ==============================================================================================
# Data construction
def make_data(sentences):
"""Convert word sequences to number sequences""""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
#[[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
#[[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
#[[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
class MyDataSet(Data.Dataset):
"""Customized DataLoader""""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# ====================================================================================================
# Transformer Model
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(
0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
"""
x: [seq_len, batch_size, d_model]
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
def get_attn_pad_mask(seq_q, seq_k):
The role of the # pad mask: when weighted averaging the value vectors, you can make the alpha_ij corresponding to the pad = 0 so that attention doesn't take into account the pad vectors
"""Here q,k denotes two sequences (nothing to do with q,k of the attention mechanism), e.g. encoder_inputs (x1,x2,. .xm) and encoder_inputs (x1,x2..xm)
Both encoder and decoder may call this function, so seq_len depends on the situation
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # this seq_q is just used to expand dimensions
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# e.g.: seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
# [batch_size, 1, len_k], True is masked
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# [batch_size, len_q, len_k] form a cube (batch_size one such matrix)
return pad_attn_mask.expand(batch_size, len_q, len_k)
def get_attn_subsequence_mask(seq):
"""Suggest printing out the output to see what it is (at a glance)
seq: [batch_size, tgt_len]
"""
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
# attn_shape: [batch_size, tgt_len, tgt_len]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # generate an upper triangular matrix
subsequence_mask = torch.from_numpy(subsequence_mask).byte()
return subsequence_mask  # [batch_size, tgt_len, tgt_len]
# ==========================================================================================
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
Description: in the Attention layer of encoder-decoder len_q(q1,.... .qt) and len_k(k1, . .km) may be different
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / \
np.sqrt(d_k)  # scores : [batch_size, n_heads, len_q, len_k]
# mask matrix fill scores (fill elements in scores with -1e9 corresponding to positions in attn_mask with value 1)
# Fills elements of self tensor with value where mask is True.
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores) # do softmax on the last dimension (v)
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
# context: [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, V)
# context: [[z1,z2,...]] ,[...]] Vector, attn attention sparse matrix (for visualization)
return context, attn
class MultiHeadAttention(nn.Module):
"""This Attention class can implement.
Self-Attention of the Encoder
Decoder's Masked Self-Attention
Encoder-Decoder Attention
Input: seq_len x d_model
Output: seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads,
bias=False) # q,k must be of the same dimension or else it cannot be dot-producted
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# This fully-connected layer ensures that the output of a multi-headed ATTENTION is still seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# The parameter matrices for the following multiple heads are put together to do a linear transformation and then split into multiple heads, a trick for engineering implementations
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# Linear transformations Split into multiples
# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1,
n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K and V must have the same length, dimensions can be different
K = self.W_K(input_K).view(batch_size, -1,
n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1,
n_heads, d_v).transpose(1, 2)
# Because it's multi-headed, the mask matrix has to be expanded to 4 dimensions
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
# The following splices together the output vectors from the different headers
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(
batch_size, -1, n_heads * d_v)
# This fully-connected layer ensures that the output of a multi-headed ATTENTION is still seq_len x d_model
output = self.fc(context)  # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn
# Linear in Pytorch will only operate on the last dimension, so it's exactly what we want with the same fully connected network for each position
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
# [batch_size, seq_len, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual)
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_mask):
"""E
enc_inputs: [batch_size, src_len, d_model]
enc_self_attn_mask: [batch_size, src_len, src_len] mask matrix (pad mask or sequence mask)
"""
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
# first enc_inputs * W_Q = Q
# 2nd enc_inputs * W_K = K
# 3rd enc_inputs * W_V = V
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,
enc_self_attn_mask) # enc_inputs to same Q,K,V (before linear transformation)
enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
"""
dec_inputs: [batch_size, tgt_len, d_model]
enc_outputs: [batch_size, src_len, d_model]
dec_self_attn_mask: [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask: [batch_size, tgt_len, src_len]
"""
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs,
dec_self_attn_mask) # Here Q,K,V are Decoder's own inputs.
# dec_outputs: [batch_size, tgt_len, d_model], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs,
dec_enc_attn_mask) # Attention layer Q (from decoder) and K,V (from encoder)
# [batch_size, tgt_len, d_model]
dec_outputs = self.pos_ffn(dec_outputs)
# dec_self_attn, dec_enc_attn these two are for visualization purposes
return dec_outputs, dec_self_attn, dec_enc_attn
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model)  # token Embedding
self.pos_emb = PositionalEncoding(
d_model) # Position encoding in Transformer is fixed and does not need to be learned.
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(
enc_inputs)  # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(
0, 1)  # [batch_size, src_len, d_model]
# pad mask matrix of Encoder input sequence
enc_self_attn_mask = get_attn_pad_mask(
enc_inputs, enc_inputs)  # [batch_size, src_len, src_len]
enc_self_attns = [] # Not needed in calculations, it's mainly used to hold the values of the attentions you return next (this is mainly for you to draw heat maps and such, to see the relationship between the words)
for layer in self.layers: # for loops through the nn.ModuleList object
# enc_outputs from previous block as input to current block
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs,
enc_self_attn_mask) # The incoming enc_outputs are actually inputs, and the mask matrix is passed in because you're doing the self attention
enc_self_attns.append(enc_self_attn) # This is just for visualization purposes
return enc_outputs, enc_self_attns
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(
tgt_vocab_size, d_model) # Decoder input embed word list
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer()
for _ in range(n_layers)]) # Decoder blocks
def forward(self, dec_inputs, enc_inputs, enc_outputs):
"""
dec_inputs: [batch_size, tgt_len]
enc_inputs: [batch_size, src_len]
enc_outputs: [batch_size, src_len, d_model] # used in the Encoder-Decoder Attention layer
"""
dec_outputs = self.tgt_emb(
dec_inputs)  # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1).to(
device)  # [batch_size, tgt_len, d_model]
# pad mask matrix of the Decoder input sequence (in this example the decoder is not padded, in practice applications are padded)
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).to(
device)  # [batch_size, tgt_len, tgt_len]
# Masked Self_Attention: future information is not visible in the current moment
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).to(
device)  # [batch_size, tgt_len, tgt_len]
# Add up the two mask matrices in Decoder (masking both pad information and future moments)
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask),
0).to(device) # [batch_size, tgt_len, tgt_len]; torch.gt compares the elements of the two matrices, returns 1 if greater, 0 otherwise
# This mask is mainly used in the encoder-decoder attention layer
# get_attn_pad_mask is mainly the pad mask matrix of enc_inputs (since enc is dealing with K,V and solving for Attention is done with v1,v2, . .vm to weight, to set the correlation coefficient of v_i corresponding to pad to 0, so that attention will not focus on the pad vector)
# dec_inputs just provide the size of the expand
dec_enc_attn_mask = get_attn_pad_mask(
dec_inputs, enc_inputs)  # [batc_size, tgt_len, src_len]
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
# Decoder's Block is the output dec_outputs of the previous Block (changing) and the output enc_outputs of the Encoder network (fixed)
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask,
dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
# dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attns, dec_enc_attns
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder().to(device)
self.decoder = Decoder().to(device)
self.projection = nn.Linear(
d_model, tgt_vocab_size, bias=False).to(device)
def forward(self, enc_inputs, dec_inputs):
"""Transformers' input: two sequences
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
"""
# tensor to store decoder outputs
# outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
# enc_outputs: [batch_size, src_len, d_model], enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
# After the Encoder network, the resulting output is still [batch_size, src_len, d_model]
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [n_layers, batch_size, tgt_len, src_len]
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(
dec_inputs, enc_inputs, enc_outputs)
# dec_outputs: [batch_size, tgt_len, d_model] -> dec_logits: [batch_size, tgt_len, tgt_vocab_size]
dec_logits = self.projection(dec_outputs)
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns
model = Transformer().to(device)
# The loss function here has a parameter ignore_index=0, because the word "pad" has an index of 0. With this setting, the loss of "pad" is not computed (because "pad" is meaningless and doesn't need to be computed).
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3,
momentum=0.99) # It doesn't work well with adam
# ====================================================================================================
for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(
device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(
enc_inputs, dec_inputs)
# dec_outputs.view(-1):[batch_size * tgt_len * tgt_vocab_size]
loss = criterion(outputs, dec_outputs.view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
optimizer.zero_grad()
loss.backward()
optimizer.step()
def greedy_decoder(model, enc_input, start_symbol):
"""Coding for greed."""
For simplicity, a Greedy Decoder is Beam search when K=1. This is necessary for inference as we don't know the
target sequence input. Therefore we try to generate the target input word by word, then feed it into the transformer.
Starting Reference: http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
:param model: Transformer Model
:param enc_input: The encoder input
:param start_symbol: The start symbol. In this example it is 'S' which corresponds to index 4
:return: The target input
"""
enc_outputs, enc_self_attns = model.encoder(enc_input)
# Initialize an empty tensor: tensor([], size=(1, 0), dtype=torch.int64)
dec_input = torch.zeros(1, 0).type_as(enc_input.data)
terminal = False
next_symbol = start_symbol
while not terminal:
# Prediction phase: the dec_input sequence gets a little bit longer (each time a new predicted word is added)
dec_input = torch.cat([dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
-1)
dec_outputs, _, _ = model.decoder(dec_input, enc_input, enc_outputs)
projected = model.projection(dec_outputs)
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
# Incremental updates (we want the repeat word predictions to be the same)
# We selectively ignore duplicate predicted words in our predictions, and only extract the most recently predicted words to splice into the input sequence.
# Take the currently predicted word (number). We use the output z_t corresponding to x'_t to predict the probability of the next word, without using z_1,z_2...z_{t-1}
next_word = prob.data[-1]
next_symbol = next_word
if next_symbol == tgt_vocab["E"]:
terminal = True
# print(next_word)
# greedy_dec_predict = torch.cat(
#     [dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
#     -1)
greedy_dec_predict = dec_input[:, 1:]
return greedy_dec_predict
# ==========================================================================================
# Forecasting phase
# Test sets
sentences = [
# enc_input                dec_input           dec_output
['I have zero girlfriends P', '', '']
]
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
test_loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
enc_inputs, _, _ = next(iter(test_loader))
print()
print("="*30)
print("Translate the Chinese sentence 'I have zero girlfriends' into an English sentence using the trained Transformer model: ")
for i in range(len(enc_inputs)):
greedy_dec_predict = greedy_decoder(model, enc_inputs[i].view(
1, -1).to(device), start_symbol=tgt_vocab["S"])
print(enc_inputs[i], '->', greedy_dec_predict.squeeze())
print([src_idx2word[t.item()] for t in enc_inputs[i]], '->',
[idx2word[n.item()] for n in greedy_dec_predict.squeeze()])

Source code fromwmathorpastHands-on implementation of the Transformer model in Pytorch code (super-detailed code interpretation)

I’ve re-organized the code ideas in conjunction with the diagrams to document and learn and share them with you. Also for the convenience of future review, access. Below, step by step, we slowly unravel the true nature of the transformer!

Run Result Display

Instead of using a large data set, we constructed a few Chinese-to-English sentence translations as a dataset for testing.

  • training set

Transformer Code Explained (Pytorch Edition)

  • test set
# test set (what the transformer is expected to achieve)
# Input: "I have a girlfriend" #
# Output: "i have a girlfriend"
  • Training 100epoch Predicting results

Transformer Code Explained (Pytorch Edition)

Data set processing

Hard-coded datasets

Manual code construction of sentences

entences = [
# The number of words in Chinese and English are not required to be the same.
# enc_input                dec_input           dec_output
P','S I have a good friend.', 'I have a good friend. E'], 's I have a good friend.',
P','S I have zero girl friend.', 'I have zero girl friend. E'], [' s I have zero girl friend.', 'I have zero girl friend. E'],
P','S I have a boy friend.', 'I have a boy friend. E'] P', 's I have a boy friend.', 'I have a boy friend. E']
]
# test set (what the transformer is expected to achieve)
# Input: "I have a girlfriend" #
# Output: "i have a girlfriend"
# Separate thesaurus for Chinese and English words
# Padding Should be Zero
src_vocab = {'P': 0, 'I ': 1,' have ': 2, 'one ': 3,
'a', 4, 'good', 5, 'friends', 6,' friends' : 7, 'zero' : 8, 'female' : 9, 10} 'male' :
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,
'friend': 5, 'zero': 6, 'girl': 7,  'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 8 # (length of source sentence) enc_input max sequence length
tgt_len = 7  # dec_input(=dec_output) max sequence length

Converting sentences to sequences of numbers of words in the thesaurus to tensor

def make_data(sentences):
"""Convert word sequences to number sequences""""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
#[[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
#[[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
#[[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)

Customize a MyDataSet to read these sentences.

class MyDataSet(Data.Dataset):
"""Customized DataLoader""""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)

Transformer model code analysis

Parameters of the Transformer model

  • d_model: we need to define the dimension of embedding, set in the paper at 512
  • d_ff: number of hidden neurons in FeedForward layer
  • d_k = d_v: the dimensions of the Q, K, and V vectors, where the dimensions of Q and K must be equal, and there is no restriction on the dimension of V, which we set to 64.
  • n_layers: the number of Encoders and Decoders, i.e., Nx in the graph.
  • n_heads: number of heads in a multi-head attention span
# Transformer Parameters
d_model = 512 # Embedding Size (dimensions of token embedding and position encoding)
# FeedForward dimension (hidden layer in two linear layers 512->2048->512, the linear layer is used for feature extraction), and of course another projection layer will be picked up at the end
d_ff = 2048
d_k = d_v = 64 # dimension of K(=Q), V (Q and K need to have the same dimension, here let K=V for convenience)
n_layers = 6 # number of Encoder of Decoder Layer (number of blocks)
n_heads = 8 # number of heads in Multi-Head Attention

Regarding the comments in the code, if the value is src_len or tgt_len, I will definitely write it clearly, but there are some functions or classes that both Encoder and Decoder may call, so I can’t be sure whether it’s src_len or tgt_len, and for the ones that I’m not sure about, I’ll write it down as seq_len.

Positional Encoding. Positional encoding

Transformer is computed in parallel input and needs to know the positional information of each word in order to recognize the sequential relationships in the language.
The first thing you need to know is that the Transformer takes a word as input, word-embedded the word, and then adds it to the positional embedding (not splicing, but simply summing the values in the corresponding positions)

The location embedding should be designed to fulfill the following conditions:

  • The positional code for each word output is unique
  • The difference between any two words should be consistent between sentences of different lengths
  • Its value should be bounded

For a more detailed introduction to Positional Encoding see this articlewrite a blog article (netspeak)

The formula for Positional Encoding is as follows:

Transformer Code Explained (Pytorch Edition)

The sin and cos in the formula correspond to the embedding_dimension dimension of a set of odd and even numbered dimensions. For example, a group of 0,1 and a group of 2,3 are processed with the above sin and cos functions to produce different periodic variations, and the position of the embedding in the
The period changes slower and slower as the dimension order increases on the embedding_dimension dimension,, eventually producing a texture that contains positional information. The period varies from 2Π to 10000*2Π.

Transformer Code Explained (Pytorch Edition)

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math
def get_positional_encoding(max_seq_len, embed_dim):
# Initialize a positional encoding
# embed_dim: dimension of word embedding
# max_seq_len: maximum sequence length
positional_encoding = np.array([
[pos / np.power(10000, 2 * i / embed_dim) for i in range(embed_dim)]
if pos != 0 else np.zeros(embed_dim) for pos in range(max_seq_len)])
positional_encoding[1:, 0::2] = np.sin(positional_encoding[1:, 0::2]) # dim 2i even number
positional_encoding[1:, 1::2] = np.cos(positional_encoding[1:, 1::2]) # dim 2i+1 odd
return positional_encoding
positional_encoding = get_positional_encoding(max_seq_len=100, embed_dim=16)
plt.figure(figsize=(5,5))
sns.heatmap(positional_encoding)
plt.title("Sinusoidal Function")
plt.xlabel("hidden dimension")
plt.ylabel("sequence length")

Transformer Code Explained (Pytorch Edition)

plt.figure(figsize=(8, 5))
plt.plot(positional_encoding[1:, 1], label="dimension 1")
plt.plot(positional_encoding[1:, 2], label="dimension 2")
plt.plot(positional_encoding[1:, 3], label="dimension 3")
plt.legend()
plt.xlabel("Sequence length")
plt.ylabel("Period of Positional Encoding")

Transformer Code Explained (Pytorch Edition)

Pad Mask

def get_attn_pad_mask(seq_q, seq_k):
The role of the # pad mask: when weighted averaging the value vectors, you can make the alpha_ij corresponding to the pad = 0 so that attention doesn't take into account the pad vectors
"""Here q,k denotes two sequences (nothing to do with q,k of the attention mechanism), e.g. encoder_inputs (x1,x2,. .xm) and encoder_inputs (x1,x2..xm)
Both encoder and decoder may call this function, so seq_len depends on the situation
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # this seq_q is just used to expand dimensions
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# e.g.: seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
# [batch_size, 1, len_k], True is masked
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# [batch_size, len_q, len_k] form a cube (batch_size one such matrix)
return pad_attn_mask.expand(batch_size, len_q, len_k)

The core code of this function is seq_k.data.eq(0), which returns a tensor of the same size as seq_k, except that the only values in it are True and False. if the value at a position of seq_k is equal to 0, then the corresponding position is True; otherwise, it is False. for example, if the input is seq_ data = [1, 2, 3, 4, 0], seq_data.data.eq(0) will return [False, False, False, False, False, False, False, False, False]. data = [1, 2, 3, 4, 0], seq_data.data.eq(0) would return [False, False, False, False, False, False, True].

Mask of the masked subsequence

def get_attn_subsequence_mask(seq):
"""Suggest printing out the output to see what it is (at a glance)
seq: [batch_size, tgt_len]
"""
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
# attn_shape: [batch_size, tgt_len, tgt_len]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # generate an upper triangular matrix
subsequence_mask = torch.from_numpy(subsequence_mask).byte()
return subsequence_mask  # [batch_size, tgt_len, tgt_len]

get_attn_subsequence_mask is only used by Decoder, the main purpose is to block the information of words in future moments. First, np.ones() generates a square matrix with all 1’s, and then np.triu() generates an upper triangular matrix, the following is how np.triu() works

Transformer Code Explained (Pytorch Edition)

sample code (computing)

import numpy as np
data = [1,2,3,4]
#k=0 denotes the upper triangle of the general form of the matrix definition
upper_triangle = np.triu(data,0)
print(upper_triangle)
#k=-1 means move one line down the diagonal position
upper_triangle_down = np.triu(data,-1)
print(upper_triangle_down)
#k=1 means move one row in diagonal position
upper_triangle_up = np.triu(data,1)
print(upper_triangle_up)
'''Output results
[[1 2 3 4]
[0 2 3 4]
[0 0 3 4]
[0 0 0 4]]
[[1 2 3 4]
[1 2 3 4]
[0 2 3 4]
[0 0 3 4]]
[[0 2 3 4]
[0 0 3 4]
[0 0 0 4]
[0 0 0 0]]
'''

ScaledDotProductAttention

Transformer Code Explained (Pytorch Edition)

class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
Description: in the Attention layer of encoder-decoder len_q(q1,.... .qt) and len_k(k1, . .km) may be different
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)  # scores : [batch_size, n_heads, len_q, len_k]
# mask matrix fill scores (fill elements in scores with -1e9 corresponding to positions in attn_mask with value 1)
scores.masked_fill_(attn_mask, -1e9)  # Fills elements of self tensor with value where mask is True.
attn = nn.Softmax(dim=-1)(scores) # do softmax on the last dimension (v)
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
context = torch.matmul(attn, V)  # context: [batch_size, n_heads, len_q, d_v]
# context: [[z1,z2,...]] ,[...]] Vector, attn attention sparse matrix (for visualization)
return context, attn

What is done here is to compute the scores from Q and K, and then multiply the scores with V to get the context vector for each word

The first step is to multiply the transpositions of Q and K. There’s not much to say, but the scores obtained after the multiplication can’t be softmaxed right away, they need to be added to attn_mask to mask out some information that needs to be masked out. attn_mask is a tensor consisting of True and False only, and it will make sure that the dimensionality of the scores between attn_mask and attn_mask are the same (otherwise it won’t be possible to do the corresponding positional summation). attn_mask is a tensor consisting only of True and False, and it must be ensured that the dimensions of attn_mask and scores have the same four values (otherwise, it will not be possible to add the corresponding positions)

After masking, the scores can be softmaxed. This is then multiplied by V to get the context

Illustration of the calculation process:

Transformer Code Explained (Pytorch Edition)

Transformer Code Explained (Pytorch Edition)

Transformer Code Explained (Pytorch Edition)

The meaning of this score table is as follows:

  • When the model processes the first data in the dataset (row 1), which contains only one word (robot), it focuses 100% of its attention on that word.
  • When the model processes the 2nd data in the dataset (row 2), it contains the word (robot must). When the model processes the word must, it focuses 42% of its attention on robot and 57% on must.

Feed Forward and Add&Norm

Transformer Code Explained (Pytorch Edition)

# Linear in Pytorch will only operate on the last dimension, so it's exactly what we want with the same fully connected network for each position
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
return nn.LayerNorm(d_model).to(device)(output + residual)  # [batch_size, seq_len, d_model]

MultiHeadAttention

Transformer Code Explained (Pytorch Edition)

Transformer Code Explained (Pytorch Edition)

Transformer Code Explained (Pytorch Edition)

class MultiHeadAttention(nn.Module):
"""This Attention class can implement.
Self-Attention of the Encoder
Decoder's Masked Self-Attention
Encoder-Decoder Attention
Input: seq_len x d_model
Output: seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads,
bias=False) # q,k must be of the same dimension or else it cannot be dot-producted
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# This fully-connected layer ensures that the output of a multi-headed ATTENTION is still seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# The parameter matrices for the following multiple heads are put together to do a linear transformation and then split into multiple heads, a trick for engineering implementations
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# Linear transformations Split into multiples
# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1,
n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K and V must have the same length, dimensions can be different
K = self.W_K(input_K).view(batch_size, -1,
n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1,
n_heads, d_v).transpose(1, 2)
# Because it's multi-headed, the mask matrix has to be expanded to 4 dimensions
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
# The following splices together the output vectors from the different headers
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(
batch_size, -1, n_heads * d_v)
# This fully-connected layer ensures that the output of a multi-headed ATTENTION is still seq_len x d_model
output = self.fc(context)  # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn

There must be three places in the complete code where MultiHeadAttention() is called. Encoder Layer is called once, and the input_Q, input_K, input_V are all enc_inputs; Decoder Layer is called twice, and the first time, all dec_inputs are called, and the second time, dec_outputs, enc_outputs, and enc_outputs are all dec_outputs, enc_outputs, and enc_outputs, respectively. In Decoder Layer, the first input is all dec_inputs, the second input is dec_outputs, enc_outputs, enc_outputs, enc_outputs, enc_outputs, enc_outputs, enc_outputs.

Encoder Layer

class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_mask):
"""E
enc_inputs: [batch_size, src_len, d_model]
enc_self_attn_mask: [batch_size, src_len, src_len] mask matrix (pad mask or sequence mask)
"""
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
# first enc_inputs * W_Q = Q
# 2nd enc_inputs * W_K = K
# 3rd enc_inputs * W_V = V
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,
enc_self_attn_mask) # enc_inputs to same Q,K,V (before linear transformation)
enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn

Encoder

Transformer Code Explained (Pytorch Edition)

class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model)  # token Embedding
self.pos_emb = PositionalEncoding(
d_model) # Position encoding in Transformer is fixed and does not need to be learned.
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(
enc_inputs)  # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(
0, 1)  # [batch_size, src_len, d_model]
# pad mask matrix of Encoder input sequence
enc_self_attn_mask = get_attn_pad_mask(
enc_inputs, enc_inputs)  # [batch_size, src_len, src_len]
enc_self_attns = [] # Not needed in calculations, it's mainly used to hold the values of the attentions you return next (this is mainly for you to draw heat maps and such, to see the relationship between the words)
for layer in self.layers: # for loops through the nn.ModuleList object
# enc_outputs from previous block as input to current block
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs,
enc_self_attn_mask) # The incoming enc_outputs are actually inputs, and the mask matrix is passed in because you're doing the self attention
enc_self_attns.append(enc_self_attn) # This is just for visualization purposes
return enc_outputs, enc_self_attns

Decoder Layer

class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
"""
dec_inputs: [batch_size, tgt_len, d_model]
enc_outputs: [batch_size, src_len, d_model]
dec_self_attn_mask: [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask: [batch_size, tgt_len, src_len]
"""
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs,
dec_self_attn_mask) # Here Q,K,V are Decoder's own inputs.
# dec_outputs: [batch_size, tgt_len, d_model], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs,
dec_enc_attn_mask) # Attention layer Q (from decoder) and K,V (from encoder)
# [batch_size, tgt_len, d_model]
dec_outputs = self.pos_ffn(dec_outputs)
# dec_self_attn, dec_enc_attn these two are for visualization purposes
return dec_outputs, dec_self_attn, dec_enc_attn

Decoder

class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(
tgt_vocab_size, d_model) # Decoder input embed word list
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer()
for _ in range(n_layers)]) # Decoder blocks
def forward(self, dec_inputs, enc_inputs, enc_outputs):
"""
dec_inputs: [batch_size, tgt_len]
enc_inputs: [batch_size, src_len]
enc_outputs: [batch_size, src_len, d_model] # used in the Encoder-Decoder Attention layer
"""
dec_outputs = self.tgt_emb(
dec_inputs)  # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1).to(
device)  # [batch_size, tgt_len, d_model]
# pad mask matrix of the Decoder input sequence (in this example the decoder is not padded, in practice applications are padded)
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).to(
device)  # [batch_size, tgt_len, tgt_len]
# Masked Self_Attention: future information is not visible in the current moment
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).to(
device)  # [batch_size, tgt_len, tgt_len]
# Add up the two mask matrices in Decoder (masking both pad information and future moments)
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask),
0).to(device) # [batch_size, tgt_len, tgt_len]; torch.gt compares the elements of the two matrices, returns 1 if greater, 0 otherwise
# This mask is mainly used in the encoder-decoder attention layer
# get_attn_pad_mask is mainly the pad mask matrix of enc_inputs (since enc is dealing with K,V and solving for Attention is done with v1,v2, . .vm to weight, to set the correlation coefficient of v_i corresponding to pad to 0, so that attention will not focus on the pad vector)
# dec_inputs just provide the size of the expand
dec_enc_attn_mask = get_attn_pad_mask(
dec_inputs, enc_inputs)  # [batc_size, tgt_len, src_len]
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
# Decoder's Block is the output dec_outputs of the previous Block (changing) and the output enc_outputs of the Encoder network (fixed)
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask,
dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
# dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attns, dec_enc_attns

Transformer

Transformer Code Explained (Pytorch Edition)

Transformer Code Explained (Pytorch Edition)

class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder().to(device)
self.decoder = Decoder().to(device)
self.projection = nn.Linear(
d_model, tgt_vocab_size, bias=False).to(device)
def forward(self, enc_inputs, dec_inputs):
"""Transformers' input: two sequences
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
"""
# tensor to store decoder outputs
# outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
# enc_outputs: [batch_size, src_len, d_model], enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
# After the Encoder network, the resulting output is still [batch_size, src_len, d_model]
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [n_layers, batch_size, tgt_len, src_len]
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(
dec_inputs, enc_inputs, enc_outputs)
# dec_outputs: [batch_size, tgt_len, d_model] -> dec_logits: [batch_size, tgt_len, tgt_vocab_size]
dec_logits = self.projection(dec_outputs)
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns

train

model = Transformer().to(device)
# The loss function here has a parameter ignore_index=0, because the word "pad" has an index of 0. With this setting, the loss of "pad" is not computed (because "pad" is meaningless and doesn't need to be computed).
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3,
momentum=0.99) # It doesn't work well with adam
for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(
device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(
enc_inputs, dec_inputs)
# dec_outputs.view(-1):[batch_size * tgt_len * tgt_vocab_size]
loss = criterion(outputs, dec_outputs.view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
optimizer.zero_grad()
loss.backward()
optimizer.step()

inference

def greedy_decoder(model, enc_input, start_symbol):
"""Coding for greed."""
For simplicity, a Greedy Decoder is Beam search when K=1. This is necessary for inference as we don't know the
target sequence input. Therefore we try to generate the target input word by word, then feed it into the transformer.
Starting Reference: http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
:param model: Transformer Model
:param enc_input: The encoder input
:param start_symbol: The start symbol. In this example it is 'S' which corresponds to index 4
:return: The target input
"""
enc_outputs, enc_self_attns = model.encoder(enc_input)
# Initialize an empty tensor: tensor([], size=(1, 0), dtype=torch.int64)
dec_input = torch.zeros(1, 0).type_as(enc_input.data)
terminal = False
next_symbol = start_symbol
while not terminal:
# Prediction phase: the dec_input sequence gets a little bit longer (each time a new predicted word is added)
dec_input = torch.cat([dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
-1)
dec_outputs, _, _ = model.decoder(dec_input, enc_input, enc_outputs)
projected = model.projection(dec_outputs)
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
# Incremental updates (we want the repeat word predictions to be the same)
# We selectively ignore duplicate predicted words in our predictions, and only extract the most recently predicted words to splice into the input sequence.
# Take the currently predicted word (number). We use the output z_t corresponding to x'_t to predict the probability of the next word, without using z_1,z_2...z_{t-1}
next_word = prob.data[-1]
next_symbol = next_word
if next_symbol == tgt_vocab["E"]:
terminal = True
# print(next_word)
# greedy_dec_predict = torch.cat(
#     [dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
#     -1)
greedy_dec_predict = dec_input[:, 1:]
return greedy_dec_predict
# ==========================================================================================
# Forecasting phase
# Test sets
sentences = [
# enc_input                dec_input           dec_output
['I have zero girlfriends P', '', '']
]
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
test_loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
enc_inputs, _, _ = next(iter(test_loader))
print()
print("="*30)
print("Translate the Chinese sentence 'I have zero girlfriends' into an English sentence using the trained Transformer model: ")
for i in range(len(enc_inputs)):
greedy_dec_predict = greedy_decoder(model, enc_inputs[i].view(
1, -1).to(device), start_symbol=tgt_vocab["S"])
print(enc_inputs[i], '->', greedy_dec_predict.squeeze())
print([src_idx2word[t.item()] for t in enc_inputs[i]], '->',
[idx2word[n.item()] for n in greedy_dec_predict.squeeze()])

The best Transformers material on the net is here !!!!

If you don’t know much about transformer architecture and still have a lot of doubts, please take a moment to read the blog post provided below and you will definitely get a great deal out of it.

Related Video

Recommended Today

Windows CMD Commands

A command prompt is a working prompt in an operating system that prompts for command entry. Command prompts vary in different operating system environments. In the windows environment, the command line program is cmd.exe, a 32-bit command line program, Microsoft Windows system based on the command interpreter program on Windows, similar to the Microsoft DOS […]