[Day9] 词性标注(四)-利用python实作POS任务

一. 资料准备

这边的code是参考coursera上课程的code,根据自己的需求改成中文的范例
此资料为conllu档,所以先透过处理这个档的package来整理

## 资料准备
!pip install conllu
from io import open
from conllu import parse_incr
import re
  • 整理成想要的格式,一个专门纪录句字的list,一个专门记录POS的tag
def arrange_dataset(file_name: str, start_word_token: str, start_pos_token: str)-> dict:
    data_file = open(file_name, "r", encoding="utf-8")
    # 处理 dataset
    sentences_list = []
    pos_list = []

    for tokenlist in parse_incr(data_file):
        temp_str = [start_word_token]
        temp_pos = [start_pos_token]

        for s in tokenlist:
            temp_str.append(s['form'])
            temp_pos.append(s['upos'])
            
        sentences_list.append(temp_str)
        pos_list.append(temp_pos)
    
    return {
        'sentences_list': sentences_list,
        'pos_list': pos_list,
    }
# 处理 train 与 test data
train_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-train.conllu'
test_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-test.conllu'
start_w_token = '--s--'
start_pos_token = '--n--'

train_data_dict = arrange_dataset(train_file_path, start_w_token, start_pos_token)
test_data_dict = arrange_dataset(test_file_path, start_w_token, start_pos_token)

train_sentence, train_pos = train_data_dict['sentences_list'], train_data_dict['pos_list']
test_sentence, test_pos = test_data_dict['sentences_list'], test_data_dict['pos_list']
  • 将每个字纪录,存成一个字典
# vocab: 将词存至字典
vocab = {}
cnt_word = 0

# 计算每个词出现的次数
freq = defaultdict(int)

for sentence in train_sentence: 
    for word in sentence:
        if word not in vocab:
            vocab[word] = cnt_word
            cnt_word += 1
            
        freq[word] += 1
    
print("字典:")
cnt = 0
for k, v in vocab.items():
    print(f"{k}:{v}")
    cnt += 1
    if cnt > 20:
        break

# output: 
# 字典:
# --s--:0
# 看似:1
# 简单:2
# ,:3
  • 处理unk的字的function
def assign_unk(word):
    punct = set(string.punctuation)
    if any(char in punct for char in word):
        return "--unk_punct--"
    return "--unk--"
  • 标记word与tag
def get_word_tag(word, pos_tag, vocab):
    if word not in vocab:
        word = assign_unk(word)
    return word, pos_tag
get_word_tag('tardigrade', 'NN', vocab)
# output: ('--unk--', 'NN')

二. 实作HMM,处理HMM需要的三个矩阵

在计算这三个矩阵之前需要先记算数量,分别有下列三个数量:
* 状态转移 (Transition): 前一个词性与後一个词性的次数,transition_counts
* 发射的数量 (Emission): 每个字在每个词性的数量,emission_counts
* 各个tag的数量: tag_count

import pandas as pd
from collections import defaultdict
import math
import numpy as np

def create_three_HMM_matrix(sentences_list: list, pos_list: list) -> dict:
    
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    sentence_len = len(sentences_list)
    
    i = 0 
    
    for sentence_idx in range(sentence_len):
        # 由该句第一个词
        prev_tag = pos_list[sentence_idx][0]
        tag_counts[prev_tag] += 1
        
        for word_tag_idx in range(1, len(sentences_list[sentence_idx])):
            i += 1

            if i % 5000 == 0:
                print(f"word count = {i}")
            
            word, tag = get_word_tag(sentences_list[sentence_idx][word_tag_idx], 
                                     pos_list[sentence_idx][word_tag_idx], 
                                     vocab)

            transition_counts[(prev_tag, tag)] += 1
            emission_counts[(tag, word)] += 1
            tag_counts[tag] += 1
            prev_tag = tag
        
    return emission_counts, transition_counts, tag_counts
emission_counts, transition_counts, tag_counts = create_three_HMM_matrix(train_sentence, train_pos)
  • 每个词性的数量:
# get all the POS states
states = sorted(tag_counts.keys())
print(f"POS tags 数量: {len(states)}")
print(states)
# output: POS tags 数量: 16
# ['--n--', 'ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SYM', 'VERB', 'X']
  • 印出每个 transition 与 emission 的矩阵
print("transition 矩阵: ")
for ex in list(transition_counts.items())[:3]:
    print(ex)
print()

print("emission 矩阵: ")
for ex in list(emission_counts.items())[200:203]:
    print (ex)
    
# transition 矩阵: 
# (('--n--', 'AUX'), 8)
# (('AUX', 'ADJ'), 217)
# (('ADJ', 'PUNCT'), 464)

# emission 矩阵: 
# (('PUNCT', ':'), 86)
# (('PUNCT', '「'), 332)
# (('VERB', '吃'), 9)
  • 接下来是将count矩阵换算成机率矩阵:
    • transition_counts里面存的是前一个词性与後一个词性的次数,ex: (('--n--', 'AUX'), 8)表示 '--n--'後面为'AUX'的次数在语料库中有8次
    • 将 (前一个词与後一个词的次数)/(前一个tag次数) 就会是前一个词与後一个词的机率了,也就是transition_matrix,其中tag次数就存在tag_counts
def get_transition_matrix(alpha, tag_counts, transition_counts):
    
    # 先将tag先拿出来    
    all_tags = sorted(tag_counts.keys())
    
    # 初始一个 transition_matrix的矩阵 因为是得到 词性与词性的机率 故矩阵大小一定为词性的总数x词性的总数
    transition_matrix = np.zeros((len(all_tags), len(all_tags)))
    
    trans_keys = set(transition_counts.keys())
    
    for i in range(len(all_tags)):
        for j in range(len(all_tags)):
            count = 0
            key = (all_tags[i], all_tags[j])
            # 将有对应得tag取出来             
            if key in transition_counts:
                count = transition_counts.get(key)
            # 取出前一个tag的数量
            count_prev_tag = tag_counts[all_tags[i]]
            
            # 用alpha来避免分母为0            
            transition_matrix[i,j] = (count + alpha)/(count_prev_tag + alpha*len(all_tags))
    
    return transition_matrix
alpha = 0.001
transition_matrix = get_transition_matrix(alpha, tag_counts, transition_counts)

print(f"transition_matrix [0][0]: {transition_matrix[0,0]:.9f}")
print(f"transition_matrix [3][1]: {transition_matrix[3,1]:.4f}")

print("transition matrix:")
transition_sub = pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4])
print(pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4]))

# transition_matrix [0][0]: 0.000000250
# transition_matrix [3][1]: 0.0709
# transition matrix:
#               --n--       ADJ       ADP       ADV
# --n--  2.501866e-07  0.017513  0.130598  0.060545
# ADJ    4.086610e-07  0.024929  0.007356  0.016756
# ADP    2.205550e-07  0.024702  0.017424  0.034407
# ADV    2.181969e-07  0.070914  0.088152  0.089243
  • 换算emission_matrix:
    • emission_counts里面存的是每个词性对应每个词出现的次数,ex: (('PUNCT', ':'), 86)表示 ':'为'PUNCT'的次数在语料库中有86次
    • 将 (每个词性对应每个词出现的次数)/(词性的次数) 就会是每个词性对应每个词的机率了,也就是emission_matrix,其中tag次数就存在tag_counts
def get_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    
    num_tags = len(tag_counts)
    
    all_tags = sorted(tag_counts.keys())
    
    num_words = len(vocab)
    
    # 初始 emission_matrix,因为是每个词在每个词性出现的机率,故矩阵大小为 词性数x语料库的字数
    emission_matrix = np.zeros((num_tags, num_words))


    for i in range(num_tags):
        for j in range(num_words):

            count = 0
            key = (all_tags[i], vocab[j])

            if key in emission_counts: 
                count = emission_counts.get(key)
                
            count_tag = tag_counts[all_tags[i]]
                
            emission_matrix[i,j] = (count+alpha)/(num_words*alpha + count_tag)

    return emission_matrix
# creating your emission probability matrix. this takes a few minutes to run. 
emission_matrix = get_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

print(f"emission_matrix[0][0]: {emission_matrix[0,0]:.9f}")
print(f"emission_matrix[3][1]: {emission_matrix[3,1]:.9f}")

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['其实','决择','出身', '10']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['--n--', 'ADP', 'ADV', 'AUX']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
emission_matrix_sub = pd.DataFrame(emission_matrix[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(emission_matrix_sub)

# emission_matrix[0][0]: 0.000000249
# emission_matrix[3][1]: 0.000000217
#                  其实            决择            出身            10
# --n--  2.490900e-07  2.490900e-07  2.490900e-07  2.490900e-07
# ADP    2.197023e-07  2.197023e-07  2.197023e-07  2.197023e-07
# ADV    1.956478e-03  2.173623e-07  2.173623e-07  2.173623e-07
# AUX    3.447547e-07  3.447547e-07  3.447547e-07  3.447547e-07

三. 实作维特比

  • 在得到emission_matrix 与 transition_matrix 後,我们要用维特比演算法来找寻一个句子词性的最大路径,维特比演算法可分为三个部分:

    • 初始: 先计算由'--s--' 算出哪个词性接在後面的机率最大
    • 正向传播: 就是前面一天的算出第二天晴天或雨天的机率为多少,这边就是计算每个词对应的机率是多少并记录前一个词性的index
    • 反向传播: 找到最後一个词的哪个词性最大,找出前一个词词性的index,一直回到'--s--',就可以列出来了
  • 初始:

def initialize(states, tag_counts, transition_matrix, emission_matrix, corpus, vocab):

    num_tags = len(tag_counts)
    
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    s_idx = states.index("--n--")
    
    for i in range(num_tags):
        
        if transition_matrix[s_idx, i] == 0:
            
            best_probs[i,0] = float('-inf')
        else:
            best_probs[i,0] = math.log(transition_matrix[s_idx][i]) + math.log(emission_matrix[i][vocab["--s--"]])

    return best_probs, best_paths

只有第0行有数值(先计算由'--s--' 算出每个接在'--s--'词性的机率)

best_probs, best_paths = initialize(states, tag_counts, transition_matrix, emission_matrix, test_sentence[0], vocab)
# Test the function
print(f"best_probs[0,0]: {best_probs[0, 0]}")
print(f"best_paths[2,3]: {best_paths[2, 3]}")

# output:
# best_probs[0,0]: -30.40651015286206
# best_paths[2,3]: 0
  • 正向传播:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
   
    num_tags = best_probs.shape[0]

    for i in range(1, len(test_corpus)): 
        for j in range(num_tags):
            
            best_prob_i = float('-inf')
            
            best_path_i = None

            for k in range(num_tags):
            
                prob = best_probs[k][i-1] + math.log(A[k][j]) + math.log(B[j][vocab[test_corpus[i]]]) 

                if best_prob_i < prob:

                    best_prob_i = prob
                    best_path_i = k

            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i

    return best_probs, best_paths
# 更新 best_probs, best_paths
best_probs, best_paths = viterbi_forward(transition_matrix, emission_matrix, test_sentence[0], best_probs, best_paths, vocab)
  • 反向传播:
def viterbi_backward(best_probs, best_paths, corpus, states):

    m = best_paths.shape[1] 

    z = [None] * m

    num_tags = best_probs.shape[0]

    best_prob_for_last_word = float('-inf')

    pred = [None] * m
    
    for k in range(num_tags):

        if best_probs[k][m-1] > best_prob_for_last_word:

            best_prob_for_last_word = best_probs[k][m-1]

            z[m - 1] = k
            
    pred[m - 1] = states[z[m - 1]]
    
    for i in range(m - 1, -1, -1):
        
        pos_tag_for_word_i = z[i]
        
        z[i - 1] = best_paths[pos_tag_for_word_i][i]
        
        pred[i - 1] = states[z[i-1]]
        
    return pred
  • 来预测词的词性~:
pred = viterbi_backward(best_probs, best_paths, test_sentence[0], states)
m=len(pred)
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", test_sentence[0][0:7])
The prediction for pred[0:8] is: 
 ['PRON', 'ADV', 'PUNCT', 'PRON', 'PART', 'NOUN', 'ADV'] 
 ['--s--', '然而', ',', '这样', '的', '处理', '也']

今天这篇比较长,如果对code不熟可以一行一行慢慢trace,尤其是维特比真的花好久才知道怎麽做,连结[1]附上该堂课的程序码code,欢迎大家看,明天会开始说明词/句子等相关词向量的表示方法~~

参考资讯
[1] 范例程序


<<:  [Day9]Beat the Spread!

>>:  Day 9 - 基本语法4 (布林值)

【从零开始的Swift开发心路历程-Day13】打造自己的私房美食名单Part2

昨天新增完XIB档後,今天要来让TableViewCell显示餐厅资讯 因为我是嘉义人~所以来推荐大...

【从零开始的Swift开发心路历程-Day6】简易调色盘Part2

昨天我们拉完@IBOutlet了,现在要来说明如何让ImageView根据Slider的左右滑动而改...

Day32 参加职训(机器学习与资料分析工程师培训班),tf.keras

今日练习内容为建构CNN模型来分类鸟类图片,最後讲解一些架构的演进 # Load Data &...

编写有效的用户故事 (user stories)

如何编写有效的用户故事? 用户故事是一种捕捉早期需求的强大技术。用户故事捕捉了需求的 WHO、WHA...

将资料表的资料转成Json抛出成API用

在MSSQL2016以下版本没有Json函数可用@@.. 那就自己写一个来汇出成API在用的Json...