Python - PySparkPracticeQuestions - PySpark 练习题参考笔记

Python - PySparkPracticeQuestions - PySpark 练习题参考笔记

参考资料

我自己的 Github:Eterna-E

专案来源连结:Eterna-E/PySparkPracticeQuestions

Spark 练习题目来源:
【Spark】Spark基础练习题(一)
【Spark】Spark基础练习题(二)

说明

如题,当初会写这篇参考笔记,主要是因为,当时正要准备开始协助 IMAC 实验室的工研院的产学合作案的专案开发,於是乎,就开始做 PySpark 的程序设计练习,然後 Spark 题目是参考使用学长推荐的这两篇:【Spark】Spark基础练习题(一)【Spark】Spark基础练习题(二),但是因为这两篇教学文章的题目解答,都是用 JAVA 语言写的,没有找到 PySpark 版本的练习,所以我只好自己用 Python 程序语言在写一遍,边写边对答案,然後又因为之前在练习题目的时候,每一题都是各自建新的档案做练习,导致後面必须要一个一个点开看,所以就顺便再重新统一格式,整理成一篇参考笔记拉XD。

特此撰写本篇文章作为纪录文件,用以方便後续有需要的时候,可以快速的重复查阅,虽然後面比较没有什麽机会再用到,但也算是一个还不错的经验。

PySpark 基础练习题(一)

Spark 基础练习题目如下所示:

读取文件的数据test.txt
一共有多少个不到 20 岁的人参加考试?
一共有多少个等於 20 岁的人参加考试?
一共有多少个大於20岁的人参加考试?
一起参加考试?
一共有多少个女生参加考试?
12班有多少人参加考试?
13班有多少人参加考试?
语文的平均成绩是多少?
数学数学的平均成绩是多少?
英语的平均成绩是多少?
有多少人平均成绩是多少?
12班平均成绩是多少?
12班班平均总成绩是多少?
12班女生平均总成绩是多少?
13班平均成绩是多少?
13班班平均总成绩是多少?
13班女生平均总成绩是多少?
全校成绩最高分是多少?
12班语文成绩最低分是多少?
13班数学最高成绩是多少?
总成绩大150分的12班的女生有几个?
总成绩大於150分,且数学大於70,且年龄大於19岁的学生的平均成绩是多少?

参考答案如下所示:

# 题目:
# 1. 读取文件的数据test.txt
# 2. 一共有多少个不到 20 岁的人参加考试?
# ans:2
from pyspark import SparkContext
from operator import add

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).map(lambda x: (x[1],1)).reduceByKey(add).count()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 3. 一共有多少个等於 20 岁的人参加考试?
# ans:2
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])==20).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])==20).groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])==20).groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 4. 一共有多少个大于20岁的人参加考试?
# ans:2
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])>20).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])>20).groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])>20).groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 5. 一共有多个男生参加考试?
# ans:4
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='男').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='男').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='男').groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 6. 一共有多少个女生参加考试?
# ans:4
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='女').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='女').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='女').groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 7. 12班有多少人参加考试?
# ans:3
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 8. 13班有多少人参加考试?
# ans:3
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').groupBy(lambda x:x[1]).count()
print(numBs)
# 题目:
# 9. 语文科目的平均成绩是多少?

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).mean()
print(numBs)
# 题目:
# 10. 数学科目的平均成绩是多少?

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math').map(lambda x: int(x[5])).mean()
print(numBs)
# 题目:
# 11. 英语科目的平均成绩是多少?63.3333

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='english').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='english').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='english').map(lambda x: int(x[5])).mean()
print(numBs)
# 题目:
# 12. 每个人平均成绩是多少?
# (王英,73)
# (杨春,70)
# (宋江,60)
# (李逵,63)
# (吴用,50)
# (林冲,53)
# val every_socre: RDD[(String, Any)] = data.map(x=>x.split(" ")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum /t._2.size))


from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).map(lambda x: (x[1],int(x[5]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], int(sum(x[1])/len(x[1])) )).collect()
print(numBs)
# 题目:
# 13. 12班平均成绩是多少? 60.0

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').map(lambda x: int(x[5])).mean()
print(numBs)
# 题目:
# // 14. 12班男生平均总成绩是多少?165.0
# // (宋江,180)
# // (吴用,150)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 题目:
# // 15. 12班女生平均总成绩是多少?210.0
# // (杨春,210)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 题目:
# 16. 13班平均成绩是多少?63.333333333333336

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').map(lambda x: int(x[5])).mean()
print(numBs)
# 题目:
# // 17. 13班男生平均总成绩是多少?175.0
# //(李逵,190)
# //(林冲,160)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 题目:
# // 18. 13班女生平均总成绩是多少?
# //(王英,220)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 题目:
# // 19. 全校语文成绩最高分是多少?70
# var max1 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese")).map(x => x(5).toInt).max()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).max()
print(numBs)
# 题目:
# // 20. 12班语文成绩最低分是多少?50
# var max2 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese") && x(0).equals("12")).map(x => x(5).toInt).min()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese' and x[0]=='12').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese' and x[0]=='12').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese' and x[0]=='12').map(lambda x: int(x[5])).min()
print(numBs)
# 题目:
# // 21. 13班数学最高成绩是多少?80
# var max3 = data.map(x => x.split(" ")).filter(x => x(4).equals("math") && x(0).equals("13")).map(x => x(5).toInt).max()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math' and x[0]=='13').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math' and x[0]=='13').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math' and x[0]=='13').map(lambda x: int(x[5])).max()
print(numBs)
# 题目:
# // 22. 总成绩大于150分的12班的女生有几个?1
# //(杨春,210)
# val count12_gt150girl: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).filter(lambda x: x[1]>150).collect()
print(numBs)
# 题目:
# // 23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?
# // 过滤出总分大于150的,并求出平均成绩    (13,李逵,男,(60,1))               (13,李逵,男,(190,3))             总成绩大于150                (13,李逵,男,63)
# val com1: RDD[(String, Int)] = complex1.map(x=>(x._1,(x._2,1))).reduceByKey((a, b)=>(a._1+b._1,a._2+b._2)).filter(a=>(a._2._1>150)).map(t=>(t._1,t._2._1/t._2._2))


from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .collect()
# print(data1)

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .map(lambda x: (x[0],(x[1],1)))\
#         .collect()
# print(data1)

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .map(lambda x: (x[0],(x[1],1)))\
#         .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
#         .collect()
# print(data1)

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .map(lambda x: (x[0],(x[1],1)))\
#         .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
#         .filter(lambda x: x[1][0]>150)\
#         .collect()
# print(data1)

data1 = logData.map(lambda s: s.split(' '))\
        .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
        .map(lambda x: (x[0],(x[1],1)))\
        .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
        .filter(lambda x: x[1][0]>150)\
        .map(lambda x: (x[0], int( x[1][0]/x[1][1] )))\
        .collect()
print(data1) # 过滤出总分大于150的,并求出平均成绩

# //过滤出 数学大于等于70,且年龄大于等于19岁的学生                filter方法返回一个boolean值 【数学成绩大于70并且年龄>=19】                                       为了将最后的数据集与com1做一个join,这里需要对返回值构造成com1格式的数据
# val com2: RDD[(String, Int)] = 
# complex2.filter(a=>{val line = a._1.split(",");
# line(4).equals("math") &&
#  a._2>=70 && line(2).toInt>=19})
# .map(a=>{val line2 = a._1.split(",");(line2(0)+","+line2(1)+","+line2(3),a._2.toInt)})
# //(12,杨春,女 , 70)
# //(13,王英,女 , 80)

data2 = logData.map(lambda s: s.split(' '))\
        .filter(lambda x: int(x[2])>=19 and x[4]=='math' and int(x[5])>=70)\
        .map(lambda x: (x[0]+','+x[1]+','+x[3] , int(x[5])))\
        .collect()
print(data2)

data1 = logData.map(lambda s: s.split(' '))\
        .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
        .map(lambda x: (x[0],(x[1],1)))\
        .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
        .filter(lambda x: x[1][0]>150)\
        .map(lambda x: (x[0], int( x[1][0]/x[1][1] )))

data2 = logData.map(lambda s: s.split(' '))\
        .filter(lambda x: int(x[2])>=19 and x[4]=='math' and int(x[5])>=70)\
        .map(lambda x: (x[0]+','+x[1]+','+x[3] , int(x[5])))

print(data1.join(data2).collect())

# // 使用join函数聚合相同key组成的value元组
# // 再使用map函数格式化元素
# val result = com1.join(com2).map(a =>(a._1,a._2._1))
# //(12,杨春,女,70)
# //(13,王英,女,73)

print(data1.join(data2).map(lambda x: (x[0],x[1][0])).collect())

PySpark基础练习题(二)

Spark 基础练习题目如下所示:

1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD

2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD

3、创建一个元素为 1-5 的RDD,运用 flatMap创建一个新的 RDD,新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27..

4、创建一个 4 个分区的 RDD数据为Array(10,20,30,40,50,60),使用glom将每个分区的数据放到一个数组

5、创建一个 RDD数据为Array(1, 3, 4, 20, 4, 5, 8),按照元素的奇偶性进行分组

6、创建一个 RDD(由字符串组成)Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"),过滤出一个新 RDD(包含“xiao”子串)

7、创建一个 RDD数据为1 to 10,请使用sample不放回抽样

8、创建一个 RDD数据为1 to 10,请使用sample放回抽样

9、创建一个 RDD数据为Array(10,10,2,5,3,5,3,6,9,1),对 RDD 中元素执行去重操作

10、创建一个分区数为5的 RDD,数据为0 to 100,之後使用coalesce再重新减少分区的数量至 2

11、创建一个分区数为5的 RDD,数据为0 to 100,之後使用repartition再重新减少分区的数量至 3

12、创建一个 RDD数据为1,3,4,10,4,6,9,20,30,16,请给RDD进行分别进行升序和降序排列

13、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,求并集

14、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算差集,两个都算

15、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算交集

16、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算 2 个 RDD 的笛卡尔积

17、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 5和11 to 15,对两个RDD拉链操作

18、创建一个RDD数据为List(("female",1),("male",5),("female",5),("male",2)),请计算出female和male的总数分别为多少

19、创建一个有两个分区的 RDD数据为List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),取出每个分区相同key对应值的最大值,然後相加

20、 创建一个有两个分区的 pairRDD数据为Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),根据 key 计算每种 key 的value的平均值

21、统计出每一个省份广告被点击次数的 TOP3,数据在access.log文件中
数据结构:时间戳,省份,城市,用户,广告 字段使用空格分割。
样本如下:
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12

22、读取本地文件words.txt,统计出每个单词的个数,保存数据到 hdfs 上

23、读取 people.json 数据的文件, 每行是一个 json 对象,进行解析输出

24、保存一个 SequenceFile 文件,使用spark创建一个RDD数据为Array(("a", 1),("b", 2),("c", 3)),保存为SequenceFile格式的文件到hdfs上

25、读取24题的SequenceFile 文件并输出

26、读写 objectFile 文件,把 RDD 保存为objectFile,RDD数据为Array(("a", 1),("b", 2),("c", 3)),并进行读取出来

27、使用内置累加器计算Accumulator.txt文件中空行的数量

28、使用Spark广播变量
用户表:
id name age gender(0|1)
001,刘向前,18,0
002,冯  剑,28,1
003,李志杰,38,0
004,郭  鹏,48,2
要求,输出用户信息,gender必须为男或者女,不能为0,1
使用广播变量把Map("0" -> "女", "1" -> "男")设置为广播变量,最终输出格式为
001,刘向前,18,女
003,李志杰,38,女
002,冯  剑,28,男
004,郭  鹏,48,男

29、mysql创建一个数据库bigdata0407,在此数据库中创建一张表
CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(32) NOT NULL COMMENT '用户名称',
  `birthday` date DEFAULT NULL COMMENT '生日',
  `sex` char(1) DEFAULT NULL COMMENT '性别',
  `address` varchar(256) DEFAULT NULL COMMENT '地址',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
数据依次是:姓名 生日 性别 省份
请使用spark将以上数据写入mysql中,并读取出来

30、在hbase中创建一个表student,有一个 message列族
create 'student', 'message'
scan 'student', {COLUMNS => 'message'}
给出以下数据,请使用spark将数据写入到hbase中的student表中,并进行查询出来
数据如下:
依次是:姓名 班级 性别 省份,对应表中的字段依次是:name,class,sex,province

参考答案如下所示:

# 1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
logData = sc.parallelize([x for x in range(10)]).cache()

data1 = logData.map(lambda x: x*2).collect()
print(data1)
# 2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(10,20)]
rdd = sc.parallelize(numList,len(numList))
def f(part):
    print ("====")
    for row in part:
        re = row*2
        yield re
    return re
data2 = rdd.mapPartitions(f).collect()
print(data2)
# 3、创建一个元素为 1-5 的RDD,
# 运用 flatMap创建一个新的 RDD,
# 新的 RDD 为原 RDD 每个元素的 平方和三次方 
# 来组成 1,1,4,8,9,27..

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(1,6)]
rdd = sc.parallelize(numList)

data3 = rdd.flatMap(lambda x: (x**2,x**3)).collect()
print(data3)
# 4、创建一个 4 个分区的 RDD数据为
# Array(10,20,30,40,50,60),
# 使用glom将每个分区的数据放到一个数组

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(10,70,10)]
rdd = sc.parallelize(numList).repartition(2)
print(rdd.collect())
print(rdd.getNumPartitions()) # partition size
data4 = rdd.glom().collect()
print(data4)
# 5、创建一个 RDD数据为Array(1, 3, 4, 20, 4, 5, 8),
# 按照元素的奇偶性进行分组

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [1, 3, 4, 20, 4, 5, 8]
rdd = sc.parallelize(numList)

data5 = rdd.groupBy(lambda x: x%2).collect()
print(data5)

data5 = rdd.groupBy(lambda x: x%2).map(lambda x: (x[0],list(x[1]))).collect()
print(data5)
# 6、创建一个 RDD(由字符串组成)
# Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"),
# 过滤出一个新 RDD(包含“xiao”子串)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
strList = ["xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"]
rdd = sc.parallelize(strList)

data6 = rdd.filter(lambda x: "xiao" in x).collect()
print(data6)
# 7、创建一个 RDD数据为1 to 10,请使用sample不放回抽样

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(1,11)]
rdd = sc.parallelize(numList)

data7 = rdd.sample(
    withReplacement=False, # 无放回抽样
    fraction=0.5
).collect()
print(data7)

# 下方抽样五次,参考:https://www.iteblog.com/archives/1395.html#sample

print('origin:',rdd.collect())
sampleList = [rdd.sample(withReplacement=False,fraction=0.5) for i in range(5)]
# print(sampleList)
for cnt,y in zip(range(len(sampleList)), sampleList):
    print('sample ' + str(cnt) +' :'+ str(y.collect()))

# withReplacement = True or False代表是否有放回。fraction = x, where x = .5,代表抽取百分比

# 参考 https://zhuanlan.zhihu.com/p/34901846
# 或 https://www.cnblogs.com/tianqizhi/p/12115707.html
# 搜寻sample
# 8、创建一个 RDD数据为1 to 10,请使用sample放回抽样
# val data8 = sc.makeRDD(1 to 10)
# val data8Result = data8.sample(true, 0.5, 1)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(1,11)]
rdd = sc.parallelize(numList)

data8 = rdd.sample(
    withReplacement=True, # 放回抽样
    fraction=0.5
).collect()
print(data8)

# 下方抽样五次,参考:https://www.iteblog.com/archives/1395.html#sample

print('origin:',rdd.collect())
sampleList = [rdd.sample(withReplacement=True,fraction=0.5) for i in range(5)]
# print(sampleList)
for cnt,y in zip(range(len(sampleList)), sampleList):
    print('sample ' + str(cnt) +' :'+ str(y.collect()))
# 9、创建一个 RDD数据为Array(10,10,2,5,3,5,3,6,9,1),
# 对 RDD 中元素执行去重操作
# val data9 = sc.makeRDD(Array(10, 10, 2, 5, 3, 5, 3, 6, 9, 1))
# val data9Result = data9.distinct()

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [10,10,2,5,3,5,3,6,9,1]
rdd = sc.parallelize(numList)

data9 = rdd.distinct().collect()
print(data9) # 未排序
data9 = sorted(data9) # 排序
print(data9)
# 10、创建一个分区数为5的 RDD,数据为0 to 100,
# 之后使用coalesce再重新减少分区的数量至 2
#  val data10 = sc.makeRDD(0 to 100, 5)
#  val data10Result = data10.coalesce(2)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(100)]
rdd = sc.parallelize(numList,5)

data10 = rdd.glom().collect()
print(data10)

coalesceData10 = rdd.coalesce(2).glom().collect()
print(coalesceData10)
# 11、创建一个分区数为5的 RDD,数据为0 to 100,
# 之后使用repartition再重新减少分区的数量至 3
#  val data11 = sc.makeRDD(0 to 100, 5)
#  val data11Result = data11.repartition(3)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(100)]
rdd = sc.parallelize(numList,5)

data11 = rdd.glom().collect()
print(data11)

data11Result = rdd.repartition(3).glom().collect()
print(data11Result)
# 12、创建一个 RDD数据为1,3,4,10,4,6,9,20,30,16,
# 请给RDD进行分别进行升序和降序排列
#  val data12 = sc.makeRDD(Array(1, 3, 4, 10, 4, 6, 9, 20, 30, 16))
#  val data12Result1 = data12.sortBy(x => x)
#  val data12Result2 = data12.sortBy(x => x, false)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [1,3,4,10,4,6,9,20,30,16]
rdd = sc.parallelize(numList)

data12Result1 = rdd.sortBy(lambda x: x).collect()
print(data12Result1) # 升序排列

data12Result2 = rdd.sortBy(lambda x: x,False).collect()
print(data12Result2) # 降序排列
# 13、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 求并集
#  val data13_1 = sc.makeRDD(1 to 6)
#  val data13_2 = sc.makeRDD(4 to 10)
#  val data13Result = data13_1.union(data13_2)
# 并集 == 台湾的联集
# 若A和B是集合,
# 则A和B联集就是包含所有A的元素和所有B的元素,
# 而没有其他元素的集合。

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data13Result = rdd1.union(rdd2).collect()
print(data13Result) # rdd1 和 rdd2 的联集
# 14、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 计算差集,两个都算
#  val data14_1 = sc.makeRDD(1 to 6)
#  val data14_2 = sc.makeRDD(4 to 10)
#  val data14Result_1 = data14_1.subtract(data14_2)
#  val data14Result_2 = data14_2.subtract(data14_1)

# 差集:若A和B是集合,
# 则A在B中的相对差集(简称差集)
# 是由所有属於B但不属於A的元素组成的集合。

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data14Result_1 = rdd1.subtract(rdd2).collect()
data14Result_2 = rdd2.subtract(rdd1).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("差集rdd1\\rdd2:", sorted(data14Result_1))
print("差集rdd1\\rdd2:", sorted(data14Result_2))
# 15、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 计算交集
#  val data15_1 = sc.makeRDD(1 to 6)
#  val data15_2 = sc.makeRDD(4 to 10)
#  val data15Result_1 = data15_1.intersection(data15_2)

# 两个集合A和B的交集是含有所有既属於A又属於B的元素,
# 而没有其他元素的集合。

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data15Result = rdd1.intersection(rdd2).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("rdd1和rdd2的交集:", sorted(data15Result))
# 16、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 计算 2 个 RDD 的笛卡尔积
#  val data16_1 = sc.makeRDD(1 to 6)
#  val data16_2 = sc.makeRDD(4 to 10)
#  val data16Result = data16_1.cartesian(data16_2)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data16Result = rdd1.cartesian(rdd2).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("rdd1和rdd2的笛卡儿积:", sorted(data16Result))
# 17、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 5和11 to 15,
# 对两个RDD拉链操作
#  val data17_1 = sc.makeRDD(1 to 5)
#  val data17_2 = sc.makeRDD(11 to 15)
#  val data17Result = data17_1.zip(data17_2)
# zip 可参考 https://www.iteblog.com/archives/1400.html#zip
# 或 http://lxw1234.com/archives/2015/07/350.htm

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,6)])
rdd2 = sc.parallelize([x for x in range(11,16)])

data17Result = rdd1.zip(rdd2).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("rdd1和rdd2的拉链操作:", sorted(data17Result))
# // 18、创建一个RDD数据为
# List(("female",1),("male",5),("female",5),("male",2))
# ,请计算出female和male的总数分别为多少
#  val data18 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2)))
#  val data18Result = data18.reduceByKey(_ + _)

from pyspark import SparkContext
from operator import add

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([("female", 1), ("male", 5), ("female", 5), ("male", 2)])

data18Result = rdd1.reduceByKey(add).collect()

print("rdd1:",rdd1.collect())
print("female和male的总数:", sorted(data18Result))
# // 19、创建一个有两个分区的 RDD数据为
# List(("a",3),("a",2),("c",4),
# ("b",3),("c",6),("c",8)),
# 取出每个分区相同key对应值的最大值,然后相加
#  /**
#   * (a,3),(a,2),(c,4)
#   * (b,3),(c,6),(c,8)
#   */
#  val data19 = 
# sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4),
#  ("b", 3), ("c", 6), ("c", 8)), 2)
#  data19.glom().collect().foreach(x => 
# println(x.mkString(",")))
#  val data19Result = data19.aggregateByKey(0)
# (math.max(_, _), _ + _)

# 参考:https://blog.csdn.net/zhuzuwei/article/details/104446388

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 3), ("a", 2), ("c", 4),("b", 3), ("c", 6), ("c", 8)]
rdd1 = sc.parallelize(dataList,2)
data19 = rdd1.glom().collect()
print(data19)
maxVal = (lambda x,y: max(x,y))
sumComb = (lambda x,y: x+y)
data19Result = rdd1.aggregateByKey(0,maxVal,sumComb).collect()

print("每个分区相同key对应值的最大值:", sorted(data19Result))
# 20、创建一个有两个分区的 pairRDD数据为
# Array(("a", 88), ("b", 95), ("a", 91), 
# ("b", 93), ("a", 95), ("b", 98)),
# 根据 key 计算每种 key 的value的平均值
#  val data20 = sc.makeRDD(Array(("a", 88), ("b", 95), ("a", 91),
#  ("b", 93), ("a", 95), ("b", 98)))
#  val data20Result = data20.groupByKey()
# .map(x => x._1 -> x._2.sum / x._2.size)
#  //或val data20Result = data20
# .map(x => (x._1, (x._2, 1)))
# .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
# .map(x => (x._1, x._2._1 / x._2._2))

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)]
rdd1 = sc.parallelize(dataList)

data20Result = rdd1.groupByKey().mapValues(list)\
    .map(lambda x: (x[0],format( sum(x[1])/len(x[1]), '.2f'))).collect()
print("rdd:",rdd1.collect())
print("计算每种 key 的value的平均值:", sorted(data20Result))
# 24、保存一个 SequenceFile 文件,
# 使用spark创建一个RDD数据为
# Array(("a", 1),("b", 2),("c", 3)),
# 保存为SequenceFile格式的文件到hdfs上
#  val data24 = 
# sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))
#  data24.saveAsSequenceFile
# ("hdfs://mycluster:8020/20200407_SequenceFile")

# 参考:https://stackoverflow.com/questions/34491579/saving-rdd-as-sequence-file-in-pyspark

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 1),("b", 2),("c", 3)]
rdd1 = sc.parallelize(dataList)

data24Result = rdd1.saveAsSequenceFile("testSeq")
# 25、读取24题的SequenceFile 文件并输出
#  val data25: RDD[(String,Int)] = 
# sc.sequenceFile[String,Int]
# ("hdfs://mycluster:8020/20200407_SequenceFile/part-00000")

# 参考:https://blog.csdn.net/appleyuchi/article/details/81133270

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")

data25Result = sc.sequenceFile("testSeq")

print(data25Result.values().collect())
# 26、读写 objectFile 文件,
# 把 RDD 保存为objectFile,
# RDD数据为Array(("a", 1),("b", 2),("c", 3)),
# 并进行读取出来
#  val data26_1 = 
# sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))
#  data26_1.
# saveAsObjectFile
# ("output20200407/20200407_objectFile")
#  val data26_2 = 
# sc.objectFile
# ("output20200407/20200407_objectFile")

# 参考:https://spark.apache.org/docs/2.3.1/api/python/_modules/pyspark/rdd.html

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 1), ("b", 2), ("c", 3)]
rdd1 = sc.parallelize(dataList)
# data26Result = rdd1.saveAsPickleFile("ObjectFile")

data26 = sc.pickleFile("ObjectFile").collect()

print(data26)
# /**
#   * 28、使用Spark广播变量
#   * 用户表:
#   * id name age gender(0|1)
#   * 001,刘向前,18,0
#   * 002,冯  剑,28,1
#   * 003,李志杰,38,0
#   * 004,郭  鹏,48,1
#   * 要求,输出用户信息,gender必须为男或者女,不能为0,1
#   * 使用广播变量把Map("0" -> "女", "1" -> "男")设置为广播变量,
#      最终输出格式为
#   * 001,刘向前,18,女
#   * 003,李志杰,38,女
#   * 002,冯  剑,28,男
#   * 004,郭  鹏,48,男
#   */
#  val data28 = sc.textFile("input20200407/user.txt")
#  val sex = sc.broadcast(Map("0" -> "女", "1" -> "男"))
#  data28.foreach { x => var datas = x.split(",");
#  println(datas(0) + "," + datas(1) + "," + datas(2) + "," + sex.value(datas(3))) }

# 参考:https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
data28 = sc.textFile("user.txt")
# print(data28.collect())
sex = sc.broadcast({"0": "女", "1": "男"})

data28result = data28.map(lambda x: x.split(','))\
    .map(lambda x: x[0] + "," + x[1] + "," + x[2] + "," + sex.value[x[3]])
# print(data28result.collect())

for data in data28result.collect():
    print(data)

#  /**
#   * 29、mysql创建一个数据库bigdata0407,在此数据库中创建一张表
#   * CREATE TABLE `user` (
#   * `id` int(11) NOT NULL AUTO_INCREMENT,
#   * `username` varchar(32) NOT NULL COMMENT '用户名称',
#   * `birthday` date DEFAULT NULL COMMENT '生日',
#   * `sex` char(1) DEFAULT NULL COMMENT '性别',
#   * `address` varchar(256) DEFAULT NULL COMMENT '地址',
#   * PRIMARY KEY (`id`)
#   * ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
#   * 数据如下:
#   * 依次是:姓名 生日 性别 省份
#   * 安荷 1998/2/7 女 江苏省
#   * 白秋 2000/3/7 女 天津市
#   * 雪莲 1998/6/7 女 湖北省
#   * 宾白 1999/7/3 男 河北省
#   * 宾实 2000/8/7 男 河北省
#   * 斌斌 1998/3/7 男 江苏省
#   * 请使用spark将以上数据写入mysql中,并读取出来。
#   */
#  val data29 = sc.textFile("input20200407/users.txt")
#  val driver = "com.mysql.jdbc.Driver"
#  val url = "jdbc:mysql://localhost:3306/bigdata0407"
#  val username = "root"
#  val password = "root"
# /**
#  * MySQL插入数据
#  */
#  data29.foreachPartition {
#    data =>
#      Class.forName(driver)
#      val connection = java.sql.DriverManager.getConnection(url, username, password)
#      val sql = "INSERT INTO `user` values (NULL,?,?,?,?)"
#      data.foreach {
#        tuples => {
#          val datas = tuples.split(" ")
#          val statement = connection.prepareStatement(sql)
#          statement.setString(1, datas(0))
#          statement.setString(2, datas(1))
#          statement.setString(3, datas(2))
#          statement.setString(4, datas(3))
#          statement.executeUpdate()
#          statement.close()
#        }
#      }
#      connection.close()
#  }
# /**
#  * MySQL查询数据
#   */
#  var sql = "select * from `user` where id between ? and ?"
#  val jdbcRDD = new JdbcRDD(sc,
#    () => {
#      Class.forName(driver)
#      java.sql.DriverManager.getConnection(url, username, password)
#    },
#    sql,
#    0,
#    44,
#    3,
#    result => {
#      println(s"id=${result.getInt(1)},username=${result.getString(2)}" +
#        s",birthday=${result.getDate(3)},sex=${result.getString(4)},address=${result.getString(5)}")
#    }
#  )
#  jdbcRDD.collect()

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
    data29 = sc.textFile("mysql_insert.txt")
    data29result = data29.map(lambda x: x.split(' '))
    # print(data29result.collect())
    username = data29.map(lambda x: x.split(' ')).map(lambda x: x[0]).collect()
    print(username)
    data_id = [x+1 for x in range(len(username))]
    print(data_id)
    birthday = data29.map(lambda x: x.split(' ')).map(lambda x: x[1]).collect()
    print(birthday)
    sex = data29.map(lambda x: x.split(' ')).map(lambda x: x[2]).collect()
    print(sex)
    address = data29.map(lambda x: x.split(' ')).map(lambda x: x[3]).collect()
    print(address)

    # 创建spark DataFrame
    # 方式3:pandas dataFrame 转spark DataFrame
    # 安荷 1998/2/7 女 江苏省
    df = pd.DataFrame({'id': data_id, 'username': username,
                       'birthday': birthday, 'sex': sex, 'address': address})
    pd_df = spark.createDataFrame(df)

    # 写入数据库
    pd_df.write.jdbc(url=url, table='user', mode='overwrite', properties=prop)

    # 读取表
    data = spark.read.jdbc(url=url, table='user', properties=prop)
    # 打印data数据类型
    print(type(data))
    # 展示数据
    data.show()

# ok

# 参考:https://zhuanlan.zhihu.com/p/136777424

<<:  Day 12 Flask 基本设定

>>:  铁人赛 Day27 -- CSS动画 animation -- @keyframes

[Day28] AWS Cloud9

AWS Cloud9 是一种云端整合开发环境 (IDE),您只需要一个浏览器便能撰写、执行和侦错程序...

自动化测试,让你上班拥有一杯咖啡的时间 | Day 15 - 设定环境变量

此系列文章会同步发文到个人部落格,有兴趣的读者可以前往观看喔。 在测试时,当要测试的环境有许多种,...

D2 - 环境安装 (Miniconda & PyCharm)

之前装Anaconda实在太占空间我看了一下我现在大概占了快5G 这次想来试试看轻量安装的Minic...

中台组织架构

在传统的烟囱架构下,组织分工会以业务来划分应用系统与维运团队,由应用系统的角度出发思考。 然而业务流...

Day08 NAT 穿透 ICE、STUN、TURN

ICE ICE(Interactive Connectivity Establishment,互动式...