Python - 在 Windows 10 上使用 PySpark 连接 Mysql 资料库参考笔记

Python - 在 Windows 10 上使用 PySpark 连接 Mysql 资料库参考笔记

参考资料

说明

如题,当初会写这篇参考笔记,主要是因为,当时正在解【Spark】Spark基础练习题(二)这篇文章上的题目时,碰到了(如上图)这题,需要使用 spark 把资料写入 Mysql 中,再读取出来,在顺便把搜寻到相关的教学资料,统整成一篇笔记,并补上一些图片方便阅读XD。

特此撰写本篇文章作为纪录文件,用以方便後续有需要的时候,可以快速的重复查阅,虽然後面比较没有什麽机会再用到,但也算是一个还不错的经验。

Window 10 PySpark 安装

  1. 先确定已安装 Python (建议到 Python 官网下载安装 64 bits https://www.python.org/downloads/windows/ )
  2. 安装 Java https://www.java.com/ ,建议安装到 C:\Java\ 下。若 Java 安装路径有空白,pyspark 执行会出现错误。
  3. 新增 JAVA_HOME 到环境变数,例如:
JAVA_HOME = C:\Java\jre1.8.0_241

  1. 在命令提示字元输入
pip3 install pyspark
  1. 在命令提示字元输入 pyspark 即可启动
    但是在 Windows 上还需要手动下载 winutils.exe 让 Hadoop 在 Windows 上正常运作
  2. https://github.com/steveloughran/winutils/ 下载对应版本的 winutils.exe (在 bin 资料夹内,例如: https://github.com/steveloughran/winutils/blob/master/hadoop-3.0.0/bin/winutils.exe)
  3. 将 winutils.exe 放到 C:\winutils\bin\ 下
  4. 新增 Windows 环境变数:
HADOOP_HOME = C:\winutils

接着还会遇到暂存资料夹 tmp\hive 权限的问题
10. 在你要开启专案的硬碟的根目录 (如 E:) 建立资料夹 \tmp\hive
(假如 ipynb 在 E:\abc\def\code.ipynb 内,就建立两个资料夹在 E:\tmp\hive,一般此资料夹会自动在执行 pyspark 时建立,但权限会有问题,需手动修改)
11. 用 winutils.exe 改变该暂存资料夹的权限

%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
  1. 检查该资料夹权限
%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive

应该要为 drwxrwxrwx

这样应该就可以正常使用 pyspark 了

PySpark Mysql环境配置

pyspark连接Mysql是通过java实现的,所以需要下载连接Mysql的jar包。
下载地址


选择下载Connector/J,然後选择操作系统为Platform Independent,下载压缩包到本地。

然後因为是直接通过 pip3 install pyspark 的方式安装 PySpark,可以先透过以下程序码查询 PySpark路径:(参考:PySpark 连线 MySQL 示例)

from pyspark import find_spark_home
print(find_spark_home._find_spark_home())

然後解压文件,将其中的jar包mysql-connector-java-8.0.22.jar放入spark的安装目录下的jars资料夹

读取 MySql

下方程序码参考:PySpark 连接 MySQL 示例的Spark 代码示例

from pyspark import SparkContext
from pyspark.sql import SQLContext

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
    
    # 读取表
    data = spark.read.jdbc(url=url, table='user', properties=prop)
    # 打印data数据类型
    print(type(data))
    # 展示数据
    data.show()
    # 关闭spark会话
    sc.stop()

注意点:

  1. prop参数需要根据实际情况修改,文中用户名和密码用xxx代替了,driver参数也可以不需要;
  2. url参数需要根据实际情况修改,格式为jdbc:mysql://主机:端口/数据库;
  3. 通过调用方法read.jdbc进行读取,返回的数据类型为spark DataFrame;

输出画面如下:

解决使用JDBC连接 MySql 时,时区跟编码的错误

参考:使用JDBC连接MySql时出现...
在连接字符串後面加上?serverTimezone=UTC

其中UTC是统一标准世界时间。

完整的连接字符串示例:
jdbc:mysql://localhost:3306/test?serverTimezone=UTC

或者还有另一种选择:
jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8,这个是解决中文乱码输入问题,当然也可以和上面的一起结合:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC

写入 MySql

下方程序码参考:pyspark对Mysql数据库进行读写
4 写入Mysql

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'

    # 创建spark DataFrame
    # 方式1:list转spark DataFrame
    l = [(1, 12), (2, 22)]
    # 创建并指定列名
    list_df = spark.createDataFrame(l, schema=['id', 'value']) 
    
    # 方式2:rdd转spark DataFrame
    rdd = sc.parallelize(l)  # rdd
    col_names = Row('id', 'value')  # 列名
    tmp = rdd.map(lambda x: col_names(*x))  # 设置列名
    rdd_df = spark.createDataFrame(tmp)  
    
    # 方式3:pandas dataFrame 转spark DataFrame
    df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
    pd_df = spark.createDataFrame(df)

    # 写入数据库
    pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
    # 关闭spark会话
    sc.stop()

效果如下:

注意点:

  1. prop和url参数同样需要根据实际情况修改;
  2. 写入数据库要求的对像类型是spark DataFrame,提供了三种常见数据类型转spark DataFrame的方法;
  3. 通过调用write.jdbc方法进行写入,其中的model参数控制写入数据的行为。


当数据库无写入的表时,这四种模式都会根据设定的表名称自动创建表,无需在Mysql里先建表。


<<:  Day15 - 汇出(下载) PDF

>>:  Ruby on Rails Model 验证及回呼

[Tableau Public] day 5:尝试制作不同种类的报表-2

第五天,星期日放假日,好像已经习惯了每日发一文章的习惯了~ 参考的资料来源一样是 day 4 的「O...

Vue.js指令(v-if & v-show & v-for)(DAY30)

v-if v-if/v-else/v-else-if 的用法就像是JavaScript的判断式,使...

安全工程101

系统工程是一门应用知识来创建或获取一个系统的学科,该系统由相互关联的元素组成,这些元素在整个系统开...

工程师养成日记:从自学到进入职场,非本科系也能写扣领薪水!

作者:徐育伟 现为零一零科技工程师 台湾科技大学 工业管理系 一切都要从大四开始说起... 那年选修...

课堂笔记 - 物联网概论(4)

应用层 1.节能 智慧电网系统 自动将感测到家庭家电上的用电资料并上传到网路 电力公司才可以藉由智...