【Day 11】 实作 - 透过 AWS 服务 - Lambda 将 JSON 格式转换成 Parquet 格式

大家好,又到了美好的周末/images/emoticon/emoticon07.gif /images/emoticon/emoticon07.gif

在Day 8、Day 9 时,我们已经透过 Data Collection services – AWS Appflow 撷取 Google Analytics 资料并将原始资料存放至 AWS 的 S3 Bucket,并发现 Google Analytics 档案为多行的 JSON 格式,无法进行後续资料分析以及视觉化,而今天我们会来教学如何透过 AWS Lambda 服务来将此 JSON 格式文件转换为 Apache Parquet 格式并将此处理过後的档案存放在 AWS S3 Bucket,不仅大大加速查询的结果以及储存的费用。
https://ithelp.ithome.com.tw/upload/images/20210925/20131073PDb9UdQkQT.png

大致流程如下所示:

  1. 於 AWS Console 搜寻 Lambda
  2. 将程序所需要安装的模组进行打包成 zip
  3. 回到 AWS Console Lambda 主页中,点选左侧选单的 Layers 并 Create layer,将 zip 档上传至 AWS 後按 Create
  4. 点选左侧选单的 Functions,并 Create function
  5. 点选此 function 并点选 Configuration 页签
  6. 点选 General configuration 页签,调整 Memory 以及 Timeout
  7. 点选 Triggers 页签,点选 Add Trigger,选择 S3,并选择先前 Appflow 存放资料的 S3 位置
  8. 点选 Permissions 页签,并点选此 Role 确认拥有存取 S3 的权限
  9. 点选 Layers,并点选 Add a layer,选择先前建立的 layers 後按 add,汇入所需套件
  10. 将以下程序码贴至 Code source 後按 Deploy

步骤一、於 AWS Console 搜寻 Lambda

https://ithelp.ithome.com.tw/upload/images/20210925/20131073tmfvMl9TSg.png


步骤二、将程序所需要安装的模组进行打包成 zip

此实作我们是用 python 3.7 撰写语法,程序码中有包含部分模组是 Lambda 原生没有的,这时我们就必须将这些模组档案打包成 zip 档,然後再上传到 AWS
那这次实作,因为我们需要安装 pandas 以及 pyarrow 套件,我在 AWS EC2 中安装模组并进行打包,以下相关指令供大家参考:

#先更新所有套件
sudo yum update -y
#安装Docker
sudo yum install docker -y
#启用Docker
sudo service docker start
#安装pip
sudo yum -y install python-pip
#创建资料夹,後续会将套件存放在此
mkdir python
#从docker hub载入python3.7.12的映像档(映像档可以详[1])
sudo docker run -it --rm -v $(pwd)/python:/python python:3.7.12
#安装pandas以及pyarrow套件
sudo pip install -t $(pwd)/python pandas==0.23.4 pyarrow==0.11.1
#将python资料夹压成zip档
sudo zip -r pandas-pyarrow.zip python

结果如下图所示:
https://ithelp.ithome.com.tw/upload/images/20210925/20131073hKl8A1i8aH.jpg


步骤三、回到 AWS Console Lambda 主页中,点选左侧选单的 Layers 并 Create layer,将 zip 档上传至 AWS 後按 Create

https://ithelp.ithome.com.tw/upload/images/20210925/201310734fQ6Nmr1dI.png


步骤四、点选左侧选单的 Functions,并 Create function

输入 Function name 并设定 runtime 为 python 3.7
https://ithelp.ithome.com.tw/upload/images/20210925/20131073oHFjp62yWs.jpg


步骤五、点选此 function 并点选 Configuration 页签

https://ithelp.ithome.com.tw/upload/images/20210925/20131073USkZNCO6fk.jpg


步骤六、点选 General configuration 页签,调整 Memory 以及 Timeout

若 Lambda 运行过程中超过 Memory或 Time,Lambda就会发生 Fail /images/emoticon/emoticon17.gif
https://ithelp.ithome.com.tw/upload/images/20210925/20131073EP8ThAIJ8T.png


步骤七、点选 Triggers 页签,点选 Add Trigger,选择 S3,并选择先前 Appflow 存放资料的 S3 位置,那麽当此 Bucket 的特定资料夹下产生新的资料时,就会自动触发此 Lambda

强烈建议 Prefix 要填写,若没有填写仅填写 Bucket 名称,那代表说只要此 Bucket 有新增资料,就会触发 Lambda,可能会造成异常的费用产生跟错误~
https://ithelp.ithome.com.tw/upload/images/20210925/20131073oPHC0zOk5Q.png


步骤八、点选 Permissions 页签,并点选此 Role 确认拥有存取 S3 的权限,这样 Lambda 才能存取 S3 Bucket 进行资料处理

https://ithelp.ithome.com.tw/upload/images/20210925/201310730hj1QVvtFv.png


步骤九、点选 Layers,并点选 Add a layer,选择先前建立的 layers 後按 add,汇入所需套件

https://ithelp.ithome.com.tw/upload/images/20210925/20131073eKPH5UDxrB.jpg


步骤十、将以下程序码贴至 Code source 後按 Deploy

#汇入所需套件
import json
import urllib.parse
import pandas as pd
import boto3
from datetime import datetime as dt

def lambda_handler(event, context):

#读取S3 Bucket名称以及档案名称
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    s3_client = boto3.client('s3')

#读取原始资料
    raw_object = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    raw_data = json.loads(raw_object['Body'].read().decode('utf-8'))

#撷取必要的原始资料并将其转换名称以及Parquet 格式
    record_dates = [dt.strptime(r['dimensions'][0], '%Y%m%d') for r in raw_data['reports'][0]['data']['rows']]
    deviceCategory= [r['dimensions'][1] for r in raw_data['reports'][0]['data']['rows']]
    pageviews = [int(r['metrics'][0]['values'][0]) for r in raw_data['reports'][0]['data']['rows']]
    df = pd.DataFrame({
        'year': [r.year for r in record_dates],
        'month': [r.month for r in record_dates],
        'day': [r.day for r in record_dates],
        'deviceCategory': deviceCategory,
        ' pageviews': pageviews
    })

#将处理好的资料储存至同一个S3 Bucket的另外一个ga-data资料夹中
    output_file = dt.now().strftime('%Y%m%d%H%M%S')
    output_path = '/tmp/{}.parquet'.format(output_file)
    df.to_parquet(output_path)
    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(bucket_name)
    bucket.upload_file(output_path, 'ga-data/{}.parquet'.format(output_file))

设定完以上内容後~ 请重新执行 Appflow,那麽 AppFlow 会重新拉取 Google Analytics 资料并存放在 AWS S3 Bucket,这时因特定路径有新的资料产生,就会触发此 Lambda 自动将 Google Analytics JSON 档案中提取必要的数据并将其转换为 Parquet 格式,再将此 Parquet 档案再上传至 AWS S3 Bucket 中
https://ithelp.ithome.com.tw/upload/images/20210925/20131073is8k803Wqr.png

今天我们顺利的把档案转成 Parquet 格式,明天我们就会开始进行资料分析以及视觉化的阶段,那就明天见啦:)

如果有任何指点与建议,也欢迎留言交流,一起漫步在 Data on AWS 中。

参考&相关来源:
[1] dockerhub python
https://hub.docker.com/_/python


<<:  [Day 10] tinyML整合开发平台介绍

>>:  Flutter基础介绍与实作-Day11 Nice to Meet you Widgets(2)

【在 iOS 开发路上的大小事-Day09】将常用的 Function 写成一个 class,让各个档案都能使用

在开发上,常常会有一些 Function 是会在各个档案中使用的,如果每次都要在需要用到这个 Fun...

DAY27 Aidea专案实作-AOI瑕疵检测(2/4)

那我们要开始着手处理我们的资料集了,今天会先做资料前处理的部分,其实不管是机器学习或是深度学习,只要...

Day8-React Hook 篇-认识 useMemo

今天介绍的是避免重新渲染的 hook useMemo,透过它可以使我们提升 React 网站的效能。...

我遇到的RWD网页难题

上次文章终於迎来续集 我的第一个RWD网页 这次的网页练习跟上次的网页是同系列的,不过这次可是有记起...

Day 05:我不是资深工具人

前言 路人甲:资工在做什麽的? 路人乙:写程序、组装电脑之类的吧! 上述的对话大家多少应该听过,我们...