IT铁人第30天 Elasticsearch 使用python查询资料 Aggregations:Scripted Metric

今天要介绍的是我另外一个也经常用的聚合方式,是Metrics底下的Scripted Metric

今天的测试资料
https://ithelp.ithome.com.tw/upload/images/20201013/20129976VuMbXNt2Wo.png

Scripted Metric

这种聚合方式是当内建的聚合功能没办法满足使用需求时,可以自订聚合,但如果需求是内建聚合就能够达到,还是推荐使用内建的方式,因为有效能保证,这种聚合方式虽然可以很迎合使用者,但效能是根据使用的写出来的东西而定的

在开始之前必须先让大家了解这种聚合方式是如何运作的。这种聚合方式分成4个阶段init_script、map_script、combine_script、reduce_script
流程是:
init_script→map_script→combine_script→reduce_script

接下来就来讲解每步流程实际上在干嘛

init_script

聚合第一步,在收集文档之前执行,初始化设置,一般都会设置一个transactions.state变数,作用我会把它想像成电脑的记忆体,就是一个暂存的地方

map_script

聚合第二步,对所有符合条件的文档执行map_script的内容,但要记住是在分片中执行

combine_script

聚合第三步,对所有分片执行combine_script的内容

reduce_script

聚合第四步,所有分片均返回结果後,在协调节点上执行reduce_script的内容,该脚本可以访问变数state(就是一开始初始化的变数)

params

可以从请求传入参数让脚本使用,但只有combine_script、map_script、init_script可以读取

接下来是实际应用,先定个目标,假设我想要得到资工一1跟资工一2的成绩平均,但如果单科成绩低於60分则加10分最高加到60分,定好了就开始吧

1.撰写init_script和params
先从init_script开始

"init_script": "state.transactions = new HashMap();"

这边会用HashMap是因为我希望经过map_script後state.transactions内部会长这样

#这边用json大概示意一下
{
  "资工一1": [77, 80], #第1,2位同学的平均
  "资工一2": [86, 60]
}

再来是params,我把不及格要加几分放在params里,一是之後要调整好调整,二是可以示范要如何使用

"params": {"add_point": 10},#上面提到加10分

2.撰写map_script
这里要做的事情就比较多了,要算出每位同学的平均,还要判断是否低於60要加分,之後再存state.transactions内

"map_script": """
    def class = doc['class'].value;        #拿值
    def math = doc['grades.math'].value;
    def mand = doc['grades.mand'].value;
    def eng = doc['grades.eng'].value;
    def soc = doc['grades.soc'].value;
    ArrayList tmp = null;
    #判断state.transactions内有无班级,有则直接存取,没有则新建
    if(!state.transactions.containsKey(class)){  
        tmp = new ArrayList();
        state.transactions.put(class, tmp);
    } else {
        tmp = state.transactions.get(class);
    }
    
    #判断小於60加分
    if (math < 60){
        if (math > 50){
            math = 60;
        }else{
            math += params.add_point;
        }
    }
    if (mand < 60){
        if (mand > 50){
            mand = 60;
        }else{
            mand += params.add_point;
        }
    }
    if (eng < 60){
        if (eng > 50){
            eng = 60;
        }else{
            eng += params.add_point;
        }
    }
    if (soc < 60){
        if (soc > 50){
            soc = 60;
        }else{
            soc += params.add_point;
        }
    }
    #计算平均且加入state.transactions
    long avg = 0;
    avg = (math + mand + eng + soc) / 4;
    tmp.add(avg);                            
"""

判断是否小於60分的部分,因为我想不到其他比较优雅的方法,所以就土法炼钢了,如果大家有比较好的写法麻烦一定要告诉我!

3.撰写combine_script
这边要做的事情就比较少了,因为我们的目的只剩下把所有同班级的同学平均加起来再除以班级人数,但combine_script是对分片执行的,所以在这边merge班级的话等等reduce_script也是要merge一次,所以这边就直接return了

"combine_script": "return state.transactions"

4.撰写reduce_script
所有分片的结果都回传了,最後要做的事把所有班级成绩merge。

Debug.explain
这边告诉大家一个方法,大家可以善用这个方法检视变数的情况,我们先用这个方法看一下state的内部情况(写在reduce_script内)

"reduce_script": """
    Debug.explain(states);
"""
"to_string" : "[{资工一1=[77, 77]}, {资工一2=[64, 94], 资工一1=[94]}, {资工一2=[56]}]"

可以看到成绩是分散的,因为是从不同分片回传的,接下来就把它合并

"reduce_script": """
    Map class_grades = new HashMap();
    #先合并成绩,整理成新的资料结构
    while (states.size() > 0){
        def data_buffer = states.remove(0);
        for(class in data_buffer.keySet()){
            Map class_tmp = null;
            if(!class_grades.containsKey(class)){
                class_tmp = new HashMap();
                class_grades.put(class, class_tmp);
                class_tmp.put("s", 0);
                class_tmp.put("n", 0);
                for(s in data_buffer.get(class)){
                    class_tmp.s += s;
                    class_tmp.n +=1;
                }
            }else{
                class_tmp = class_grades.get(class);
                for(s in data_buffer.get(class)){
                    class_tmp.s += s;
                    class_tmp.n +=1;
                }
            }
        }
    }
    #根据新的资料结构算出平均
    Map result = new HashMap();
    for(class in class_grades.keySet()){
        Map data = class_grades.get(class);
        long avg = 0;
        avg = data.s / data.n;
        result.put(class, avg);
    }
    return result;
"""

整体aggs query:

{
  "aggs": {
    "class_avg": {
      "scripted_metric": {
        "params": {
          "add_point": 10
        },
        "init_script": "state.transactions = new HashMap();",
        "map_script": """
            def class = doc['class'].value;
            def math = doc['grades.math'].value;
            def mand = doc['grades.mand'].value;
            def eng = doc['grades.eng'].value;
            def soc = doc['grades.soc'].value;
            ArrayList tmp = null;

            if(!state.transactions.containsKey(class)){
                tmp = new ArrayList();
                state.transactions.put(class, tmp);
            } else {
                tmp = state.transactions.get(class);
            }
            if (math < 60){
                if (math > 50){
                    math = 60;
                }else{
                    math += params.add_point;
                }
            }
            if (mand < 60){
                if (mand > 50){
                    mand = 60;
                }else{
                    mand += 10;
                }
            }
            if (eng < 60){
                if (eng > 50){
                    eng = 60;
                }else{
                    eng += 10;
                }
            }
            if (soc < 60){
                if (soc > 50){
                    soc = 60;
                }else{
                    soc += 10;
                }
            }
            long avg = 0;
            avg = (math + mand + eng + soc) / 4;
            tmp.add(avg);                            
        """,
        "combine_script": "return state.transactions",
        "reduce_script": """
          Map class_grades = new HashMap();
          while (states.size() > 0){
              def data_buffer = states.remove(0);
              for(class in data_buffer.keySet()){
                  Map class_tmp = null;
                  if(!class_grades.containsKey(class)){
                      class_tmp = new HashMap();
                      class_grades.put(class, class_tmp);
                      class_tmp.put("s", 0);
                      class_tmp.put("n", 0);
                      for(s in data_buffer.get(class)){
                          class_tmp.s += s;
                          class_tmp.n +=1;
                      }
                  }else{
                      class_tmp = class_grades.get(class);
                      for(s in data_buffer.get(class)){
                          class_tmp.s += s;
                          class_tmp.n +=1;
                      }
                  }
              }
          }
          Map result = new HashMap();
          for(class in class_grades.keySet()){
              Map data = class_grades.get(class);
              long avg = 0;
              avg = data.s / data.n;
              result.put(class, avg);
          }
          return result;
      """
      }
    }
  }
}

结果:

"aggregations" : {
  "class_avg" : {
    "value" : {
      "资工一1" : 82,
      "资工一2" : 72
    }
  }
}

验算一下
资工一1
王小明:60,87,90,74 平均77.75
小新:73,78,75,83 平均77.25
风间:91,92,95,98 平均94
平均83

资工一2
正男:76,70,69,58(60) 平均68.75
阿呆:91,99,100,87 平均94.25
许小美:34(44),65,43(53),56(60) 平均55.5
平均72.8

会有点误差应该是因为小数点後没有算到

今天的教学就到这边结束,漫长的30天终於结束了,也谢谢大家观看我的文章!


<<:  Day29|30天以来的努力

>>:  [Day29] 第二十九课 Azure灾害复原(DRaaS)-2[进阶]

系统分析师的养成之路—案例分享(1)

上周我跟大家分享了系统分析师必须具备的「观察」、「商业思维」、「聆听」共3个软实力,但在讲述下一个主...

Youtube API - 将 Google Cloud Platform 专案串接 Data API

「鲑鱼均,因为一场鲑鱼之乱被主管称为鲑鱼世代,广义来说以年龄和脸蛋分类的话这应该算是一种 KNN 的...

[Day 08] 从 tensorflow.keras 开始的 VGG Net 生活 (第一季)

-1. 序 OK,资料分析做完了, 现在要进入演算法的部分, 我们未来几天将从经典卷积神经网路架构中...

[iT铁人赛Day24]练习题(3)

今天来讲到第三题练习题 题目的大意就是: 有一个小弟弟正在玩积木,然後说他盖了一个城墙。 但他姐姐说...

[进阶指南] 不使用 ES6 开发 React( Day27 )

如果不使用 ES6 的 Class,则可以考虑用 create-react-class 。 var ...