
- 预训练方式

- 文本 -> 向量
word2vec也可以做到同样的事
但word2vec是静态的,而BERT是动态的
Bert结构-Embedding

Bert结构-transformer-self_attention

Bert结构-transformer-多头机制multi_head

Bert结构-transformer

- Bert优势
1、通过预训练利用了海量无标注文本数据
2、相比词向量,BERT的文本表示结合了语境
3、Transformer模型结构有很强的拟合能力,词与词之间的距离不会造成关系计算上的损失
4、效果大幅提升 - Bert劣势
1.预训练需要数据,时间,和机器(开源模型缓解了这一问题)
2.难以应用在生成式任务上
3.参数量大,运算复杂,满足不了部分真实场景性能需求
4.没有下游数据做fine-tune,效果依然不理想
DiyBert
import torch
import math
import numpy as np
from transformers import BertModel'''
通过手动矩阵运算实现Bert结构
模型文件下载 https://huggingface.co/models
'''
bert = BertModel.from_pretrained(r"..\..\bert-base-chinese", return_dict=False)
state_dict = bert.state_dict()
bert.eval()
x = np.array([2450, 15486, 102, 2110])
torch_x = torch.LongTensor([x])
seqence_output, pooler_output = bert(torch_x)
print(seqence_output.shape, pooler_output.shape)
print(bert.state_dict().keys())
def softmax(x):return np.exp(x)/np.sum(np.exp(x), axis=-1, keepdims=True)
def gelu(x):return 0.5 * x * (1 + np.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * np.power(x, 3))))class DiyBert:def __init__(self, state_dict):self.num_attention_heads = 12self.hidden_size = 768self.num_layers = 12 self.load_weights(state_dict)def load_weights(self, state_dict):self.word_embeddings = state_dict["embeddings.word_embeddings.weight"].numpy()self.position_embeddings = state_dict["embeddings.position_embeddings.weight"].numpy()self.token_type_embeddings = state_dict["embeddings.token_type_embeddings.weight"].numpy()self.embeddings_layer_norm_weight = state_dict["embeddings.LayerNorm.weight"].numpy()self.embeddings_layer_norm_bias = state_dict["embeddings.LayerNorm.bias"].numpy()self.transformer_weights = []for i in range(self.num_layers):q_w = state_dict["encoder.layer.%d.attention.self.query.weight" % i].numpy()q_b = state_dict["encoder.layer.%d.attention.self.query.bias" % i].numpy()k_w = state_dict["encoder.layer.%d.attention.self.key.weight" % i].numpy()k_b = state_dict["encoder.layer.%d.attention.self.key.bias" % i].numpy()v_w = state_dict["encoder.layer.%d.attention.self.value.weight" % i].numpy()v_b = state_dict["encoder.layer.%d.attention.self.value.bias" % i].numpy()attention_output_weight = state_dict["encoder.layer.%d.attention.output.dense.weight" % i].numpy()attention_output_bias = state_dict["encoder.layer.%d.attention.output.dense.bias" % i].numpy()attention_layer_norm_w = state_dict["encoder.layer.%d.attention.output.LayerNorm.weight" % i].numpy()attention_layer_norm_b = state_dict["encoder.layer.%d.attention.output.LayerNorm.bias" % i].numpy()intermediate_weight = state_dict["encoder.layer.%d.intermediate.dense.weight" % i].numpy()intermediate_bias = state_dict["encoder.layer.%d.intermediate.dense.bias" % i].numpy()output_weight = state_dict["encoder.layer.%d.output.dense.weight" % i].numpy()output_bias = state_dict["encoder.layer.%d.output.dense.bias" % i].numpy()ff_layer_norm_w = state_dict["encoder.layer.%d.output.LayerNorm.weight" % i].numpy()ff_layer_norm_b = state_dict["encoder.layer.%d.output.LayerNorm.bias" % i].numpy()self.transformer_weights.append([q_w, q_b, k_w, k_b, v_w, v_b, attention_output_weight, attention_output_bias,attention_layer_norm_w, attention_layer_norm_b, intermediate_weight, intermediate_bias,output_weight, output_bias, ff_layer_norm_w, ff_layer_norm_b])self.pooler_dense_weight = state_dict["pooler.dense.weight"].numpy()self.pooler_dense_bias = state_dict["pooler.dense.bias"].numpy()def embedding_forward(self, x):we = self.get_embedding(self.word_embeddings, x) pe = self.get_embedding(self.position_embeddings, np.array(list(range(len(x))))) te = self.get_embedding(self.token_type_embeddings, np.array([0] * len(x))) embedding = we + pe + teembedding = self.layer_norm(embedding, self.embeddings_layer_norm_weight, self.embeddings_layer_norm_bias) return embeddingdef get_embedding(self, embedding_matrix, x):return np.array([embedding_matrix[index] for index in x])def all_transformer_layer_forward(self, x):for i in range(self.num_layers):x = self.single_transformer_layer_forward(x, i)return xdef single_transformer_layer_forward(self, x, layer_index):weights = self.transformer_weights[layer_index]q_w, q_b, \k_w, k_b, \v_w, v_b, \attention_output_weight, attention_output_bias, \attention_layer_norm_w, attention_layer_norm_b, \intermediate_weight, intermediate_bias, \output_weight, output_bias, \ff_layer_norm_w, ff_layer_norm_b = weightsattention_output = self.self_attention(x,q_w, q_b,k_w, k_b,v_w, v_b,attention_output_weight, attention_output_bias,self.num_attention_heads,self.hidden_size)x = self.layer_norm(x + attention_output, attention_layer_norm_w, attention_layer_norm_b)feed_forward_x = self.feed_forward(x,intermediate_weight, intermediate_bias,output_weight, output_bias)x = self.layer_norm(x + feed_forward_x, ff_layer_norm_w, ff_layer_norm_b)return xdef self_attention(self,x,q_w,q_b,k_w,k_b,v_w,v_b,attention_output_weight,attention_output_bias,num_attention_heads,hidden_size):q = np.dot(x, q_w.T) + q_b k = np.dot(x, k_w.T) + k_b v = np.dot(x, v_w.T) + v_b attention_head_size = int(hidden_size / num_attention_heads)q = self.transpose_for_scores(q, attention_head_size, num_attention_heads)k = self.transpose_for_scores(k, attention_head_size, num_attention_heads)v = self.transpose_for_scores(v, attention_head_size, num_attention_heads)qk = np.matmul(q, k.swapaxes(1, 2)) qk /= np.sqrt(attention_head_size)qk = softmax(qk)qkv = np.matmul(qk, v)qkv = qkv.swapaxes(0, 1).reshape(-1, hidden_size)attention = np.dot(qkv, attention_output_weight.T) + attention_output_biasreturn attentiondef transpose_for_scores(self, x, attention_head_size, num_attention_heads):max_len, hidden_size = x.shapex = x.reshape(max_len, num_attention_heads, attention_head_size)x = x.swapaxes(1, 0) return xdef feed_forward(self,x,intermediate_weight, intermediate_bias, output_weight, output_bias, ):x = np.dot(x, intermediate_weight.T) + intermediate_biasx = gelu(x)x = np.dot(x, output_weight.T) + output_biasreturn xdef layer_norm(self, x, w, b):x = (x - np.mean(x, axis=1, keepdims=True)) / np.std(x, axis=1, keepdims=True)x = x * w + breturn xdef pooler_output_layer(self, x):x = np.dot(x, self.pooler_dense_weight.T) + self.pooler_dense_biasx = np.tanh(x)return xdef forward(self, x):x = self.embedding_forward(x)sequence_output = self.all_transformer_layer_forward(x)pooler_output = self.pooler_output_layer(sequence_output[0])return sequence_output, pooler_output
db = DiyBert(state_dict)
diy_sequence_output, diy_pooler_output = db.forward(x)
torch_sequence_output, torch_pooler_output = bert(torch_x)print(diy_sequence_output)
print(torch_sequence_output)
BertDemo
import torch
import torch.nn as nn
import numpy as np
import random
import json
from transformers import BertModel"""
基于pytorch的网络编写
实现一个网络完成一个简单nlp任务
判断文本中是否有某些特定字符出现
week2的例子,修改引入bert
"""
class TorchModel(nn.Module):def __init__(self, input_dim, sentence_length, vocab):super(TorchModel, self).__init__()self.bert = BertModel.from_pretrained(r"..\..\bert-base-chinese", return_dict=False)self.classify = nn.Linear(input_dim, 3)self.activation = torch.sigmoid self.dropout = nn.Dropout(0.5)self.loss = nn.functional.cross_entropydef forward(self, x, y=None):sequence_output, pooler_output = self.bert(x)x = self.classify(pooler_output)y_pred = self.activation(x)if y is not None:return self.loss(y_pred, y)else:return y_pred
def build_vocab():chars = "abcdefghijklmnopqrstuvwxyz" vocab = {}for index, char in enumerate(chars):vocab[char] = index + 1 vocab['unk'] = len(vocab)+1return vocab
def build_sample(vocab, sentence_length):x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]if set("abc") & set(x) and not set("xyz") & set(x):y = 0elif not set("abc") & set(x) and set("xyz") & set(x):y = 1else:y = 2x = [vocab.get(word, vocab['unk']) for word in x] return x, y
def build_dataset(batch_size, vocab, sentence_length):dataset_x = []dataset_y = []for i in range(batch_size):x, y = build_sample(vocab, sentence_length)dataset_x.append(x)dataset_y.append(y)return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)
def build_model(vocab, char_dim, sentence_length):model = TorchModel(char_dim, sentence_length, vocab)return model
def evaluate(model, vocab, sample_length):model.eval()total = 200 x, y = build_dataset(total, vocab, sample_length) y = y.squeeze()print("A类样本数量:%d, B类样本数量:%d, C类样本数量:%d"%(y.tolist().count(0), y.tolist().count(1), y.tolist().count(2)))correct, wrong = 0, 0with torch.no_grad():y_pred = model(x) for y_p, y_t in zip(y_pred, y): if int(torch.argmax(y_p)) == int(y_t):correct += 1 else:wrong += 1print("正确预测个数:%d / %d, 正确率:%f"%(correct, total, correct/(correct+wrong)))return correct/(correct+wrong)def main():epoch_num = 15 batch_size = 20 train_sample = 1000 char_dim = 768 sentence_length = 6 vocab = build_vocab() model = build_model(vocab, char_dim, sentence_length) optim = torch.optim.Adam(model.parameters(), lr=1e-5) log = []for epoch in range(epoch_num):model.train()watch_loss = []for batch in range(int(train_sample / batch_size)):x, y = build_dataset(batch_size, vocab, sentence_length) optim.zero_grad() loss = model(x, y) loss.backward() optim.step() watch_loss.append(loss.item())print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))acc = evaluate(model, vocab, sentence_length) log.append([acc, np.mean(watch_loss)])torch.save(model.state_dict(), "model.pth")writer = open("vocab.json", "w", encoding="utf8")writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))writer.close()return
def predict(model_path, vocab_path, input_strings):char_dim = 20 sentence_length = 6 vocab = json.load(open(vocab_path, "r", encoding="utf8"))model = build_model(vocab, char_dim, sentence_length) model.load_state_dict(torch.load(model_path)) x = []for input_string in input_strings:x.append([vocab[char] for char in input_string]) model.eval() with torch.no_grad(): result = model.forward(torch.LongTensor(x)) for i, input_string in enumerate(input_strings):print(int(torch.argmax(result[i])), input_string, result[i]) if __name__ == "__main__":main()
计算Bert参数量
from transformers import BertModel
'''
计算Bert参数量
'''
model = BertModel.from_pretrained(r"..\bert-base-chinese", return_dict=False)
n = 2
vocab = 21128
max_sequence_length = 512
embedding_size = 768
hide_size = 3072
num_layers = 1
embedding_parameters = vocab * embedding_size + max_sequence_length * embedding_size + n * embedding_size + embedding_size + embedding_size
self_attention_parameters = (embedding_size * embedding_size + embedding_size) * 3
self_attention_out_parameters = embedding_size * embedding_size + embedding_size + embedding_size + embedding_size
feed_forward_parameters = embedding_size * hide_size + hide_size + embedding_size * hide_size + embedding_size + embedding_size + embedding_size
pool_fc_parameters = embedding_size * embedding_size + embedding_size
all_paramerters = embedding_parameters + (self_attention_parameters + self_attention_out_parameters + \feed_forward_parameters) * num_layers + pool_fc_parameters
print("模型实际参数个数为%d" % sum(p.numel() for p in model.parameters()))
print("diy计算参数个数为%d" % all_paramerters)