pyspark是Spark对Python的api接口,可以在Python环境中通过调用pyspark模块来操作spark,完成大数据框架下的数据分析与挖掘。其中,数据的读写是基础操作,pyspark的子模块pyspark.sql 可以完成大部分类型的数据读写。文本介绍在pyspark中读写Mysql数据库。
1 软件版本
在Python中使用Spark,需要安装配置Spark,这里跳过配置的过程,给出运行环境和相关程序版本信息。
- win10 64bit
- java 13.0.1
- spark 3.0
- python 3.8
- pyspark 3.0
- pycharm 2019.3.4
2 环境配置
pyspark连接Mysql是通过java实现的,所以需要下载连接Mysql的jar包。
下载地址
选择下载Connector/J
,然后选择操作系统为Platform Independent
,下载压缩包到本地。
然后解压文件,将其中的jar包mysql-connector-java-8.0.19.jar
放入spark的安装目录下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars
。
环境配置完成!
3 读取Mysql
脚本如下:
from pyspark.sql import SQLContext, SparkSession if __name__ == '__main__': # spark 初始化 spark = SparkSession. Builder(). appName('sql'). master('local'). getOrCreate() # mysql 配置(需要修改) prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 读取表 data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop) # 打印data数据类型 print(type(data)) # 展示数据 data.show() # 关闭spark会话 spark.stop()
- 注意点:
prop
参数需要根据实际情况修改,文中用户名和密码用xxx代替了,driver
参数也可以不需要;url
参数需要根据实际情况修改,格式为jdbc:mysql://主机:端口/数据库
;- 通过调用方法
read.jdbc
进行读取,返回的数据类型为spark DataFrame;
运行脚本,输出如下:
4 写入Mysql
脚本如下:
import pandas as pd from pyspark import SparkContext from pyspark.sql import SQLContext, Row if __name__ == '__main__': # spark 初始化 sc = SparkContext(master='local', appName='sql') spark = SQLContext(sc) # mysql 配置(需要修改) prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://host:port/database' # 创建spark DataFrame # 方式1:list转spark DataFrame l = [(1, 12), (2, 22)] # 创建并指定列名 list_df = spark.createDataFrame(l, schema=['id', 'value']) # 方式2:rdd转spark DataFrame rdd = sc.parallelize(l) # rdd col_names = Row('id', 'value') # 列名 tmp = rdd.map(lambda x: col_names(*x)) # 设置列名 rdd_df = spark.createDataFrame(tmp) # 方式3:pandas dataFrame 转spark DataFrame df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]}) pd_df = spark.createDataFrame(df) # 写入数据库 pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop) # 关闭spark会话 sc.stop()
注意点:
prop
和url
参数同样需要根据实际情况修改;
写入数据库要求的对象类型是spark DataFrame,提供了三种常见数据类型转spark DataFrame的方法;
通过调用write.jdbc
方法进行写入,其中的model
参数控制写入数据的行为。
model
参数解释
error
默认值,原表存在则报错
ignore
原表存在,不报错且不写入数据
append
新数据在原表行末追加
overwrite
覆盖原表
5 常见报错
Access denied for user …
原因:mysql配置参数出错
解决办法:检查user,password拼写,检查账号密码是否正确,用其他工具测试mysql是否能正常连接,做对比检查。
No suitable driver
原因:没有配置运行环境
解决办法:下载jar包进行配置,具体过程参考本文的2 环境配置。
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]