使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作-創(chuàng)新互聯(lián)

使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作?針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡(jiǎn)單易行的方法。

成都創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),鷹手營(yíng)子企業(yè)網(wǎng)站建設(shè),鷹手營(yíng)子品牌網(wǎng)站建設(shè),網(wǎng)站定制,鷹手營(yíng)子網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,鷹手營(yíng)子網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。

pyspark是Spark對(duì)Python的api接口,可以在Python環(huán)境中通過調(diào)用pyspark模塊來操作spark,完成大數(shù)據(jù)框架下的數(shù)據(jù)分析與挖掘。其中,數(shù)據(jù)的讀寫是基礎(chǔ)操作,pyspark的子模塊pyspark.sql 可以完成大部分類型的數(shù)據(jù)讀寫。文本介紹在pyspark中讀寫Mysql數(shù)據(jù)庫(kù)。

1 軟件版本

在Python中使用Spark,需要安裝配置Spark,這里跳過配置的過程,給出運(yùn)行環(huán)境和相關(guān)程序版本信息。

  • win10 64bit

  • java 13.0.1

  • spark 3.0

  • python 3.8

  • pyspark 3.0

  • pycharm 2019.3.4

2 環(huán)境配置

pyspark連接Mysql是通過java實(shí)現(xiàn)的,所以需要下載連接Mysql的jar包。

下載地址

使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作


選擇下載Connector/J,然后選擇操作系統(tǒng)為Platform Independent,下載壓縮包到本地。


使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作


然后解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars。


使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作


環(huán)境配置完成!

3 讀取Mysql

腳本如下:

from pyspark.sql import SQLContext, SparkSession

if __name__ == '__main__':
  # spark 初始化
  spark = SparkSession. \
    Builder(). \
    appName('sql'). \
    master('local'). \
    getOrCreate()
  # mysql 配置(需要修改)
  prop = {'user': 'xxx', 
      'password': 'xxx', 
      'driver': 'com.mysql.cj.jdbc.Driver'}
  # database 地址(需要修改)
  url = 'jdbc:mysql://host:port/database'
  # 讀取表
  data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
  # 打印data數(shù)據(jù)類型
  print(type(data))
  # 展示數(shù)據(jù)
  data.show()
  # 關(guān)閉spark會(huì)話
  spark.stop()
  • 注意點(diǎn):

  • prop參數(shù)需要根據(jù)實(shí)際情況修改,文中用戶名和密碼用xxx代替了,driver參數(shù)也可以不需要;

  • url參數(shù)需要根據(jù)實(shí)際情況修改,格式為jdbc:mysql://主機(jī):端口/數(shù)據(jù)庫(kù)

  • 通過調(diào)用方法read.jdbc進(jìn)行讀取,返回的數(shù)據(jù)類型為spark DataFrame;

運(yùn)行腳本,輸出如下:


使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作

4 寫入Mysql

腳本如下:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
  # spark 初始化
  sc = SparkContext(master='local', appName='sql')
  spark = SQLContext(sc)
  # mysql 配置(需要修改)
  prop = {'user': 'xxx',
      'password': 'xxx',
      'driver': 'com.mysql.cj.jdbc.Driver'}
  # database 地址(需要修改)
  url = 'jdbc:mysql://host:port/database'

  # 創(chuàng)建spark DataFrame
  # 方式1:list轉(zhuǎn)spark DataFrame
  l = [(1, 12), (2, 22)]
  # 創(chuàng)建并指定列名
  list_df = spark.createDataFrame(l, schema=['id', 'value']) 
  
  # 方式2:rdd轉(zhuǎn)spark DataFrame
  rdd = sc.parallelize(l) # rdd
  col_names = Row('id', 'value') # 列名
  tmp = rdd.map(lambda x: col_names(*x)) # 設(shè)置列名
  rdd_df = spark.createDataFrame(tmp) 
  
  # 方式3:pandas dataFrame 轉(zhuǎn)spark DataFrame
  df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
  pd_df = spark.createDataFrame(df)

  # 寫入數(shù)據(jù)庫(kù)
  pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
  # 關(guān)閉spark會(huì)話
  sc.stop()

注意點(diǎn):

propurl參數(shù)同樣需要根據(jù)實(shí)際情況修改;

寫入數(shù)據(jù)庫(kù)要求的對(duì)象類型是spark DataFrame,提供了三種常見數(shù)據(jù)類型轉(zhuǎn)spark DataFrame的方法;

通過調(diào)用write.jdbc方法進(jìn)行寫入,其中的model參數(shù)控制寫入數(shù)據(jù)的行為。


model參數(shù)解釋
error默認(rèn)值,原表存在則報(bào)錯(cuò)
ignore原表存在,不報(bào)錯(cuò)且不寫入數(shù)據(jù)
append新數(shù)據(jù)在原表行末追加
overwrite覆蓋原表

5 常見報(bào)錯(cuò)

Access denied for user …


使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作


原因:mysql配置參數(shù)出錯(cuò)
解決辦法:檢查user,password拼寫,檢查賬號(hào)密碼是否正確,用其他工具測(cè)試mysql是否能正常連接,做對(duì)比檢查。

No suitable driver


使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作


原因:沒有配置運(yùn)行環(huán)境
解決辦法:下載jar包進(jìn)行配置,具體過程參考本文的2 環(huán)境配置

關(guān)于使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

本文題目:使用pyspark怎么對(duì)Mysql數(shù)據(jù)庫(kù)進(jìn)行讀寫操作-創(chuàng)新互聯(lián)
網(wǎng)站路徑:http://muchs.cn/article22/dcpjcc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷、品牌網(wǎng)站建設(shè)、虛擬主機(jī)用戶體驗(yàn)、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)