如题,当初会写这篇参考笔记,主要是因为,当时正在解【Spark】Spark基础练习题(二)这篇文章上的题目时,碰到了(如上图)这题,需要使用 spark 把资料写入 Mysql 中,再读取出来,在顺便把搜寻到相关的教学资料,统整成一篇笔记,并补上一些图片方便阅读XD。
特此撰写本篇文章作为纪录文件,用以方便後续有需要的时候,可以快速的重复查阅,虽然後面比较没有什麽机会再用到,但也算是一个还不错的经验。
JAVA_HOME = C:\Java\jre1.8.0_241
pip3 install pyspark
HADOOP_HOME = C:\winutils
接着还会遇到暂存资料夹 tmp\hive 权限的问题
10. 在你要开启专案的硬碟的根目录 (如 E:) 建立资料夹 \tmp\hive
(假如 ipynb 在 E:\abc\def\code.ipynb 内,就建立两个资料夹在 E:\tmp\hive,一般此资料夹会自动在执行 pyspark 时建立,但权限会有问题,需手动修改)
11. 用 winutils.exe 改变该暂存资料夹的权限
%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive
应该要为 drwxrwxrwx
这样应该就可以正常使用 pyspark 了
pyspark连接Mysql是通过java实现的,所以需要下载连接Mysql的jar包。
下载地址
选择下载Connector/J,然後选择操作系统为Platform Independent,下载压缩包到本地。
然後因为是直接通过 pip3 install pyspark 的方式安装 PySpark,可以先透过以下程序码查询 PySpark路径:(参考:PySpark 连线 MySQL 示例)
from pyspark import find_spark_home
print(find_spark_home._find_spark_home())
然後解压文件,将其中的jar包mysql-connector-java-8.0.22.jar放入spark的安装目录下的jars资料夹
下方程序码参考:PySpark 连接 MySQL 示例的Spark 代码示例
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'root',
'password': 'rootroot',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
# 读取表
data = spark.read.jdbc(url=url, table='user', properties=prop)
# 打印data数据类型
print(type(data))
# 展示数据
data.show()
# 关闭spark会话
sc.stop()
注意点:
输出画面如下:
参考:使用JDBC连接MySql时出现...
在连接字符串後面加上?serverTimezone=UTC
其中UTC是统一标准世界时间。
完整的连接字符串示例:
jdbc:mysql://localhost:3306/test?serverTimezone=UTC
或者还有另一种选择:
jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8
,这个是解决中文乱码输入问题,当然也可以和上面的一起结合:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
下方程序码参考:pyspark对Mysql数据库进行读写
的4 写入Mysql
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'root',
'password': 'rootroot',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
# 创建spark DataFrame
# 方式1:list转spark DataFrame
l = [(1, 12), (2, 22)]
# 创建并指定列名
list_df = spark.createDataFrame(l, schema=['id', 'value'])
# 方式2:rdd转spark DataFrame
rdd = sc.parallelize(l) # rdd
col_names = Row('id', 'value') # 列名
tmp = rdd.map(lambda x: col_names(*x)) # 设置列名
rdd_df = spark.createDataFrame(tmp)
# 方式3:pandas dataFrame 转spark DataFrame
df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
pd_df = spark.createDataFrame(df)
# 写入数据库
pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
# 关闭spark会话
sc.stop()
效果如下:
注意点:
当数据库无写入的表时,这四种模式都会根据设定的表名称自动创建表,无需在Mysql里先建表。
第五天,星期日放假日,好像已经习惯了每日发一文章的习惯了~ 参考的资料来源一样是 day 4 的「O...
v-if v-if/v-else/v-else-if 的用法就像是JavaScript的判断式,使...
系统工程是一门应用知识来创建或获取一个系统的学科,该系统由相互关联的元素组成,这些元素在整个系统开...
作者:徐育伟 现为零一零科技工程师 台湾科技大学 工业管理系 一切都要从大四开始说起... 那年选修...
应用层 1.节能 智慧电网系统 自动将感测到家庭家电上的用电资料并上传到网路 电力公司才可以藉由智...