延续昨天的实作,首先我们先来修改一下昨天建置的 Learned Index 类别,还有一些参数需要储存(昨天忘记嘞QQ),当我们的 Learned Index 被实体化後进行 build,建置模型以便後续训练资料:
import numpy as np
from scipy.stats import norm
import keras
from keras.models import Sequential
from keras.layers import Dense
from sklearn.linear_model import LinearRegression
from sklearn import preprocessing
import matplotlib.pyplot as plt
import math
class Learned_Index():
def __init__(self, model_num):
self.RMI = [1, model_num] # Learned Index RMI架构
self.index = [] # 储存模型的索引
self.N = None # key值总数
self.data = None # 所有key值
self.error_bound = [] # 储存最後一层每个Model的 min_err and max_err
self.mean = None # 储存均值,资料标准化用
self.std = None # 储存标准差,资料标准化用
self.build()
def build(self):
for m in self.RMI:
if m==1 :
# 第一层 => 建置 NN Model 8x8
model = Sequential()
model.add(Dense(8, input_dim=1, activation="relu"))
model.add(Dense(8, activation="relu"))
model.add(Dense(1))
sgd=keras.optimizers.SGD(lr=0.000001) # lr:学习率,可调参数
model.compile(loss="mse", optimizer=sgd, metrics=["mse"])
self.index.append(model)
else:
# 第二层 => 建置多个简单线性回归
self.index.append([])
for j in range(m):
model = LinearRegression()
self.index[1].append(model)
建置CDF资料(pos=CDF(Key)),当作Model学习的Label:
def crtCDF(self,x):
if(type(x) == np.ndarray):
loc = x.mean()
scale = x.std()
N = x.size
pos = norm.cdf(x, loc, scale)*N
return pos
else:
print("Wrong Type! x must be np.ndarray ~")
return
从第一层叠带训练至第二层模型,主要可以分4个步骤: 第一层的模型训练、分配资料至第二层的模型、第二层的模型训练、计算最後一层的模型误差
def train(self, data):
self.data = data
self.N = data.size
y = self.crtCDF(data)
norm_data = preprocessing.scale(data) # 标准化: 零均值化
self.mean = data.mean()
self.std = data.std()
for m in self.RMI:
if m==1 :
# 训练第一层 NN 8x8 Model
sgd=keras.optimizers.SGD(lr=0.000001) # lr:学习率,可调参数
self.index[0].compile(loss="mse", optimizer=sgd, metrics=["mse"])
self.index[0].fit(norm_data, y, epochs=100, batch_size=32, verbose=0)
else:
# 依据第一层 Model 训练结果将资料分配至第二层
sub_data = [ [] for i in range(m)] # 储存第二层模型的各个keys
sub_y = [ [] for i in range(m)] # 储存第二层模型的各个labels
for i in range(self.N):
print(data[i])
mm = int(self.index[0].predict([[norm_data[i]]])*m/self.N)
if mm < 0:
mm=0
elif mm > m-1:
mm = m-1
sub_data[mm].append(data[i])
sub_y[mm].append(y[i])
# 训练第二层所有的 SLR Model
for j in range(m):
xx = np.array(sub_data[j])
yy = np.array(sub_y[j])
min_err = max_err = 0
if xx.size > 0:
xx = np.reshape(xx,(-1,1))
self.index[1][j].fit(xx, yy)
# 计算最後一层 Model 的 min_err/max_err
for i in range(data.size):
pred_pos, _ = self.predict(data[i])
err = pred_pos - i
if err < min_err:
min_err = math.floor(err)
elif err > max_err:
max_err = math.ceil(err)
self.error_bound.append([min_err, max_err])
查询资料是否存在,首先执行predict,预测资料所在位置,如果预测不准,在 [pos+min_err, pos+max_err] 中进行Binary Search,也就是所谓的Model Biased Search。如果查询到此key值回传True,查询不到则回传False。
def predict(self, key):
mm = int(self.index[0].predict([[(key-self.mean)/self.std]])*self.RMI[1]/self.N)
if mm < 0:
mm=0
elif mm > self.RMI[1]-1:
mm = self.RMI[1]-1
pred_pos = int(self.index[1][mm].predict([[key]]))
return pred_pos, mm
def search(self, key): # Model Biased Search
pos, model = self.predict(key)
l = pos + self.error_bound[model][0]
r = pos + self.error_bound[model][1]
# 检查预测出的位置是否超出资料范围
if pos < 0:
l = pos = 0
if pos > self.N-1:
r = pos = self.N-1
if l < 0:
l=0
if r > self.N-1:
r = self.N-1
print(l,pos,r)
# binary search
while l<=r:
if self.data[pos] == key:
return True
elif self.data[pos] > key:
r = pos - 1
elif self.data[pos] < key:
l = pos + 1
pos = int((l+r)/2)
return False
Learned Index的模拟实作大致告一个段落,写的很烂请多包涵XD
应该是没有bugㄅ..有的话再改XD
提醒一下,训练的资料必须是排序好的一维array且型态必须是numpy array!
明天我们会来比较Learned Index与单一个Model(NN、SLR)的预测分布 ~ 掰噗
开始前,先提个小小的观念: ✏️ 在 Git 的世界里,不管是新增、删除或重新命名,都可以看为是一个...
前言 想透过 phpMyAdmin 把正式机资料拉下来,汇入本机 docker 上的资料库做开发;但...
function 函数 为什麽要用函数:函数可以把需要重复执行的行为打包,在需要使用的时候直接使用函...
前情提要 就是今天 (゚∀゚) 可以很嚣张自称铁人了 (゚∀゚) ((超级膨胀 最後的终点 一路写来...
命令行参数 一般来说编译好的执行档都是透过命令行来制执有些时候需要读取一些命令行参数或是环境参数 程...