一. 资料准备
这边的code是参考coursera上课程的code,根据自己的需求改成中文的范例
此资料为conllu档,所以先透过处理这个档的package来整理
整理为以下格式,以 '天气真好'为例:
汇入资料
## 资料准备
!pip install conllu
from io import open
from conllu import parse_incr
import re
def arrange_dataset(file_name: str, start_word_token: str, start_pos_token: str)-> dict:
data_file = open(file_name, "r", encoding="utf-8")
# 处理 dataset
sentences_list = []
pos_list = []
for tokenlist in parse_incr(data_file):
temp_str = [start_word_token]
temp_pos = [start_pos_token]
for s in tokenlist:
temp_str.append(s['form'])
temp_pos.append(s['upos'])
sentences_list.append(temp_str)
pos_list.append(temp_pos)
return {
'sentences_list': sentences_list,
'pos_list': pos_list,
}
# 处理 train 与 test data
train_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-train.conllu'
test_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-test.conllu'
start_w_token = '--s--'
start_pos_token = '--n--'
train_data_dict = arrange_dataset(train_file_path, start_w_token, start_pos_token)
test_data_dict = arrange_dataset(test_file_path, start_w_token, start_pos_token)
train_sentence, train_pos = train_data_dict['sentences_list'], train_data_dict['pos_list']
test_sentence, test_pos = test_data_dict['sentences_list'], test_data_dict['pos_list']
# vocab: 将词存至字典
vocab = {}
cnt_word = 0
# 计算每个词出现的次数
freq = defaultdict(int)
for sentence in train_sentence:
for word in sentence:
if word not in vocab:
vocab[word] = cnt_word
cnt_word += 1
freq[word] += 1
print("字典:")
cnt = 0
for k, v in vocab.items():
print(f"{k}:{v}")
cnt += 1
if cnt > 20:
break
# output:
# 字典:
# --s--:0
# 看似:1
# 简单:2
# ,:3
def assign_unk(word):
punct = set(string.punctuation)
if any(char in punct for char in word):
return "--unk_punct--"
return "--unk--"
def get_word_tag(word, pos_tag, vocab):
if word not in vocab:
word = assign_unk(word)
return word, pos_tag
get_word_tag('tardigrade', 'NN', vocab)
# output: ('--unk--', 'NN')
二. 实作HMM,处理HMM需要的三个矩阵
在计算这三个矩阵之前需要先记算数量,分别有下列三个数量:
* 状态转移 (Transition): 前一个词性与後一个词性的次数,transition_counts
* 发射的数量 (Emission): 每个字在每个词性的数量,emission_counts
* 各个tag的数量: tag_count
import pandas as pd
from collections import defaultdict
import math
import numpy as np
def create_three_HMM_matrix(sentences_list: list, pos_list: list) -> dict:
emission_counts = defaultdict(int)
transition_counts = defaultdict(int)
tag_counts = defaultdict(int)
sentence_len = len(sentences_list)
i = 0
for sentence_idx in range(sentence_len):
# 由该句第一个词
prev_tag = pos_list[sentence_idx][0]
tag_counts[prev_tag] += 1
for word_tag_idx in range(1, len(sentences_list[sentence_idx])):
i += 1
if i % 5000 == 0:
print(f"word count = {i}")
word, tag = get_word_tag(sentences_list[sentence_idx][word_tag_idx],
pos_list[sentence_idx][word_tag_idx],
vocab)
transition_counts[(prev_tag, tag)] += 1
emission_counts[(tag, word)] += 1
tag_counts[tag] += 1
prev_tag = tag
return emission_counts, transition_counts, tag_counts
emission_counts, transition_counts, tag_counts = create_three_HMM_matrix(train_sentence, train_pos)
# get all the POS states
states = sorted(tag_counts.keys())
print(f"POS tags 数量: {len(states)}")
print(states)
# output: POS tags 数量: 16
# ['--n--', 'ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SYM', 'VERB', 'X']
print("transition 矩阵: ")
for ex in list(transition_counts.items())[:3]:
print(ex)
print()
print("emission 矩阵: ")
for ex in list(emission_counts.items())[200:203]:
print (ex)
# transition 矩阵:
# (('--n--', 'AUX'), 8)
# (('AUX', 'ADJ'), 217)
# (('ADJ', 'PUNCT'), 464)
# emission 矩阵:
# (('PUNCT', ':'), 86)
# (('PUNCT', '「'), 332)
# (('VERB', '吃'), 9)
def get_transition_matrix(alpha, tag_counts, transition_counts):
# 先将tag先拿出来
all_tags = sorted(tag_counts.keys())
# 初始一个 transition_matrix的矩阵 因为是得到 词性与词性的机率 故矩阵大小一定为词性的总数x词性的总数
transition_matrix = np.zeros((len(all_tags), len(all_tags)))
trans_keys = set(transition_counts.keys())
for i in range(len(all_tags)):
for j in range(len(all_tags)):
count = 0
key = (all_tags[i], all_tags[j])
# 将有对应得tag取出来
if key in transition_counts:
count = transition_counts.get(key)
# 取出前一个tag的数量
count_prev_tag = tag_counts[all_tags[i]]
# 用alpha来避免分母为0
transition_matrix[i,j] = (count + alpha)/(count_prev_tag + alpha*len(all_tags))
return transition_matrix
alpha = 0.001
transition_matrix = get_transition_matrix(alpha, tag_counts, transition_counts)
print(f"transition_matrix [0][0]: {transition_matrix[0,0]:.9f}")
print(f"transition_matrix [3][1]: {transition_matrix[3,1]:.4f}")
print("transition matrix:")
transition_sub = pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4])
print(pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4]))
# transition_matrix [0][0]: 0.000000250
# transition_matrix [3][1]: 0.0709
# transition matrix:
# --n-- ADJ ADP ADV
# --n-- 2.501866e-07 0.017513 0.130598 0.060545
# ADJ 4.086610e-07 0.024929 0.007356 0.016756
# ADP 2.205550e-07 0.024702 0.017424 0.034407
# ADV 2.181969e-07 0.070914 0.088152 0.089243
def get_emission_matrix(alpha, tag_counts, emission_counts, vocab):
num_tags = len(tag_counts)
all_tags = sorted(tag_counts.keys())
num_words = len(vocab)
# 初始 emission_matrix,因为是每个词在每个词性出现的机率,故矩阵大小为 词性数x语料库的字数
emission_matrix = np.zeros((num_tags, num_words))
for i in range(num_tags):
for j in range(num_words):
count = 0
key = (all_tags[i], vocab[j])
if key in emission_counts:
count = emission_counts.get(key)
count_tag = tag_counts[all_tags[i]]
emission_matrix[i,j] = (count+alpha)/(num_words*alpha + count_tag)
return emission_matrix
# creating your emission probability matrix. this takes a few minutes to run.
emission_matrix = get_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))
print(f"emission_matrix[0][0]: {emission_matrix[0,0]:.9f}")
print(f"emission_matrix[3][1]: {emission_matrix[3,1]:.9f}")
# Try viewing emissions for a few words in a sample dataframe
cidx = ['其实','决择','出身', '10']
# Get the integer ID for each word
cols = [vocab[a] for a in cidx]
# Choose POS tags to show in a sample dataframe
rvals =['--n--', 'ADP', 'ADV', 'AUX']
# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]
# Get the emissions for the sample of words, and the sample of POS tags
emission_matrix_sub = pd.DataFrame(emission_matrix[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(emission_matrix_sub)
# emission_matrix[0][0]: 0.000000249
# emission_matrix[3][1]: 0.000000217
# 其实 决择 出身 10
# --n-- 2.490900e-07 2.490900e-07 2.490900e-07 2.490900e-07
# ADP 2.197023e-07 2.197023e-07 2.197023e-07 2.197023e-07
# ADV 1.956478e-03 2.173623e-07 2.173623e-07 2.173623e-07
# AUX 3.447547e-07 3.447547e-07 3.447547e-07 3.447547e-07
三. 实作维特比
在得到emission_matrix 与 transition_matrix 後,我们要用维特比演算法来找寻一个句子词性的最大路径,维特比演算法可分为三个部分:
初始:
def initialize(states, tag_counts, transition_matrix, emission_matrix, corpus, vocab):
num_tags = len(tag_counts)
best_probs = np.zeros((num_tags, len(corpus)))
best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
s_idx = states.index("--n--")
for i in range(num_tags):
if transition_matrix[s_idx, i] == 0:
best_probs[i,0] = float('-inf')
else:
best_probs[i,0] = math.log(transition_matrix[s_idx][i]) + math.log(emission_matrix[i][vocab["--s--"]])
return best_probs, best_paths
只有第0行有数值(先计算由'--s--' 算出每个接在'--s--'词性的机率)
best_probs, best_paths = initialize(states, tag_counts, transition_matrix, emission_matrix, test_sentence[0], vocab)
# Test the function
print(f"best_probs[0,0]: {best_probs[0, 0]}")
print(f"best_paths[2,3]: {best_paths[2, 3]}")
# output:
# best_probs[0,0]: -30.40651015286206
# best_paths[2,3]: 0
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
num_tags = best_probs.shape[0]
for i in range(1, len(test_corpus)):
for j in range(num_tags):
best_prob_i = float('-inf')
best_path_i = None
for k in range(num_tags):
prob = best_probs[k][i-1] + math.log(A[k][j]) + math.log(B[j][vocab[test_corpus[i]]])
if best_prob_i < prob:
best_prob_i = prob
best_path_i = k
best_probs[j,i] = best_prob_i
best_paths[j,i] = best_path_i
return best_probs, best_paths
# 更新 best_probs, best_paths
best_probs, best_paths = viterbi_forward(transition_matrix, emission_matrix, test_sentence[0], best_probs, best_paths, vocab)
def viterbi_backward(best_probs, best_paths, corpus, states):
m = best_paths.shape[1]
z = [None] * m
num_tags = best_probs.shape[0]
best_prob_for_last_word = float('-inf')
pred = [None] * m
for k in range(num_tags):
if best_probs[k][m-1] > best_prob_for_last_word:
best_prob_for_last_word = best_probs[k][m-1]
z[m - 1] = k
pred[m - 1] = states[z[m - 1]]
for i in range(m - 1, -1, -1):
pos_tag_for_word_i = z[i]
z[i - 1] = best_paths[pos_tag_for_word_i][i]
pred[i - 1] = states[z[i-1]]
return pred
pred = viterbi_backward(best_probs, best_paths, test_sentence[0], states)
m=len(pred)
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", test_sentence[0][0:7])
The prediction for pred[0:8] is:
['PRON', 'ADV', 'PUNCT', 'PRON', 'PART', 'NOUN', 'ADV']
['--s--', '然而', ',', '这样', '的', '处理', '也']
今天这篇比较长,如果对code不熟可以一行一行慢慢trace,尤其是维特比真的花好久才知道怎麽做,连结[1]附上该堂课的程序码code,欢迎大家看,明天会开始说明词/句子等相关词向量的表示方法~~
参考资讯
[1] 范例程序
昨天新增完XIB档後,今天要来让TableViewCell显示餐厅资讯 因为我是嘉义人~所以来推荐大...
昨天我们拉完@IBOutlet了,现在要来说明如何让ImageView根据Slider的左右滑动而改...
今日练习内容为建构CNN模型来分类鸟类图片,最後讲解一些架构的演进 # Load Data &...
如何编写有效的用户故事? 用户故事是一种捕捉早期需求的强大技术。用户故事捕捉了需求的 WHO、WHA...
在MSSQL2016以下版本没有Json函数可用@@.. 那就自己写一个来汇出成API在用的Json...