Spark系列(十)——SparkSQL外部數(shù)據(jù)源

一、簡介

1.1 多數(shù)據(jù)源支持

Spark 支持以下六個核心數(shù)據(jù)源,同時 Spark 社區(qū)還提供了多達上百種數(shù)據(jù)源的讀取方式,能夠滿足絕大部分使用場景。

成都網(wǎng)站制作、網(wǎng)站設計、外貿(mào)網(wǎng)站建設的開發(fā),更需要了解用戶,從用戶角度來建設網(wǎng)站,獲得較好的用戶體驗。創(chuàng)新互聯(lián)多年互聯(lián)網(wǎng)經(jīng)驗,見的多,溝通容易、能幫助客戶提出的運營建議。作為成都一家網(wǎng)絡公司,打造的就是網(wǎng)站建設產(chǎn)品直銷的概念。選擇創(chuàng)新互聯(lián),不只是建站,我們把建站作為產(chǎn)品,不斷的更新、完善,讓每位來訪用戶感受到浩方產(chǎn)品的價值服務。

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

注:以下所有測試文件均可從本倉庫的resources 目錄進行下載

1.2 讀數(shù)據(jù)格式

所有讀取 API 遵循以下調用格式:

// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST")          // 讀取模式
.option("inferSchema", "true")       // 是否自動推斷 schema
.option("path", "path/to/file(s)")   // 文件路徑
.schema(someSchema)                  // 使用預定義的 schema      
.load()

讀取模式有以下三種可選項:

讀模式描述
permissive 當遇到損壞的記錄時,將其所有字段設置為 null,并將所有損壞的記錄放在名為 _corruption t_record 的字符串列中
dropMalformed 刪除格式不正確的行
failFast 遇到格式不正確的數(shù)據(jù)時立即失敗

1.3 寫數(shù)據(jù)格式

// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE")         //寫模式
.option("dateFormat", "yyyy-MM-dd")  //日期格式
.option("path", "path/to/file(s)")
.save()

寫數(shù)據(jù)模式有以下四種可選項:

Scala/Java描述
SaveMode.ErrorIfExists 如果給定的路徑已經(jīng)存在文件,則拋出異常,這是寫數(shù)據(jù)默認的模式
SaveMode.Append 數(shù)據(jù)以追加的方式寫入
SaveMode.Overwrite 數(shù)據(jù)以覆蓋的方式寫入
SaveMode.Ignore 如果給定的路徑已經(jīng)存在文件,則不做任何操作

<br/>

二、CSV

CSV 是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每個字段用逗號分隔。

2.1 讀取CSV文件

自動推斷類型讀取讀取示例:

spark.read.format("csv")
.option("header", "false")        // 文件中的第一行是否為列的名稱
.option("mode", "FAILFAST")      // 是否快速失敗
.option("inferSchema", "true")   // 是否自動推斷 schema
.load("/usr/file/csv/dept.csv")
.show()

使用預定義類型:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//預定義數(shù)據(jù)格式
val myManualSchema = new StructType(Array(
    StructField("deptno", LongType, nullable = false),
    StructField("dname", StringType,nullable = true),
    StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()

2.2 寫入CSV文件

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具體的分隔符:

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

2.3 可選配置

為節(jié)省主文篇幅,所有讀寫配置項見文末 9.1 小節(jié)。

<br/>

三、JSON

3.1 讀取JSON文件

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默認不支持一條數(shù)據(jù)記錄跨越多行 (如下),可以通過配置 multiLinetrue 來進行更改,其默認值為 false。

// 默認支持單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//默認不支持多行
{
  "DEPTNO": 10,
  "DNAME": "ACCOUNTING",
  "LOC": "NEW YORK"
}

3.2 寫入JSON文件

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3 可選配置

為節(jié)省主文篇幅,所有讀寫配置項見文末 9.2 小節(jié)。

<br/>

四、Parquet

Parquet 是一個開源的面向列的數(shù)據(jù)存儲,它提供了多種存儲優(yōu)化,允許讀取單獨的列非整個文件,這不僅節(jié)省了存儲空間而且提升了讀取效率,它是 Spark 是默認的文件格式。

4.1 讀取Parquet文件

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2 寫入Parquet文件

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3 可選配置

Parquet 文件有著自己的存儲規(guī)則,因此其可選配置項比較少,常用的有如下兩個:

讀寫操作配置項可選值默認值描述
Write compression or codec None,<br/>uncompressed,<br/>bzip2,<br/>deflate, gzip,<br/>lz4, or snappy None 壓縮文件格式
Read mergeSchema true, false 取決于配置項 spark.sql.parquet.mergeSchema 當為真時,Parquet 數(shù)據(jù)源將所有數(shù)據(jù)文件收集的 Schema 合并在一起,否則將從摘要文件中選擇 Schema,如果沒有可用的摘要文件,則從隨機數(shù)據(jù)文件中選擇 Schema。

更多可選配置可以參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

<br/>

五、ORC

ORC 是一種自描述的、類型感知的列文件格式,它針對大型數(shù)據(jù)的讀寫進行了優(yōu)化,也是大數(shù)據(jù)中常用的文件格式。

5.1 讀取ORC文件

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2 寫入ORC文件

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")

<br/>

六、SQL Databases

Spark 同樣支持與傳統(tǒng)的關系型數(shù)據(jù)庫進行數(shù)據(jù)讀寫。但是 Spark 程序默認是沒有提供數(shù)據(jù)庫驅動的,所以在使用前需要將對應的數(shù)據(jù)庫驅動上傳到安裝目錄下的 jars 目錄中。下面示例使用的是 MySQL 數(shù)據(jù)庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar 上傳到 jars 目錄下。

6.1 讀取數(shù)據(jù)

讀取全表數(shù)據(jù)示例如下,這里的 help_keyword 是 mysql 內置的字典表,只有 help_keyword_idname 兩個字段。

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")            //驅動
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //數(shù)據(jù)庫地址
.option("dbtable", "help_keyword")                    //表名
.option("user", "root").option("password","root").load().show(10)

從查詢結果讀取數(shù)據(jù):

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()

//輸出
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+

也可以使用如下的寫法進行數(shù)據(jù)的過濾:

val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10  OR name = 'WHEN'")   //指定數(shù)據(jù)過濾條件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() 

//輸出:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+

可以使用 numPartitions 指定讀取數(shù)據(jù)的并行度:

option("numPartitions", 10)

在這里,除了可以指定分區(qū)外,還可以設置上界和下界,任何小于下界的值都會被分配在第一個分區(qū)中,任何大于上界的值都會被分配在最后一個分區(qū)中。

val colName = "help_keyword_id"   //用于判斷上下界的列
val lowerBound = 300L    //下界
val upperBound = 500L    //上界
val numPartitions = 10   //分區(qū)綜述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
                             colName,lowerBound,upperBound,numPartitions,props)

想要驗證分區(qū)內容,可以使用 mapPartitionsWithIndex 這個算子,代碼如下:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
    val buffer = new ListBuffer[String]
    while (iterator.hasNext) {
        buffer.append(index + "分區(qū):" + iterator.next())
    }
    buffer.toIterator
}).foreach(println)

執(zhí)行結果如下:help_keyword 這張表只有 600 條左右的數(shù)據(jù),本來數(shù)據(jù)應該均勻分布在 10 個分區(qū),但是 0 分區(qū)里面卻有 319 條數(shù)據(jù),這是因為設置了下限,所有小于 300 的數(shù)據(jù)都會被限制在第一個分區(qū),即 0 分區(qū)。同理所有大于 500 的數(shù)據(jù)被分配在 9 分區(qū),即最后一個分區(qū)。

Spark 系列(十)—— Spark SQL 外部數(shù)據(jù)源

6.2 寫入數(shù)據(jù)

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()

<br/>

七、Text

Text 文件在讀寫性能方面并沒有任何優(yōu)勢,且不能表達明確的數(shù)據(jù)結構,所以其使用的比較少,讀寫操作如下:

7.1 讀取Text數(shù)據(jù)

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2 寫入Text數(shù)據(jù)

df.write.text("/tmp/spark/txt/dept")

<br/>

八、數(shù)據(jù)讀寫高級特性

8.1 并行讀

多個 Executors 不能同時讀取同一個文件,但它們可以同時讀取不同的文件。這意味著當您從一個包含多個文件的文件夾中讀取數(shù)據(jù)時,這些文件中的每一個都將成為 DataFrame 中的一個分區(qū),并由可用的 Executors 并行讀取。

8.2 并行寫

寫入的文件或數(shù)據(jù)的數(shù)量取決于寫入數(shù)據(jù)時 DataFrame 擁有的分區(qū)數(shù)量。默認情況下,每個數(shù)據(jù)分區(qū)寫一個文件。

8.3 分區(qū)寫入

分區(qū)和分桶這兩個概念和 Hive 中分區(qū)表和分桶表是一致的。都是將數(shù)據(jù)按照一定規(guī)則進行拆分存儲。需要注意的是 partitionBy 指定的分區(qū)和 RDD 中分區(qū)不是一個概念:這里的分區(qū)表現(xiàn)為輸出目錄的子目錄,數(shù)據(jù)分別存儲在對應的子目錄中。

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出文件。

Spark 系列(十)—— Spark SQL 外部數(shù)據(jù)源

8.3 分桶寫入

分桶寫入就是將數(shù)據(jù)按照指定的列和桶數(shù)進行散列,目前分桶寫入只支持保存為表,實際上這就是 Hive 的分桶表。

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

8.5 文件大小管理

如果寫入產(chǎn)生小文件數(shù)量過多,這時會產(chǎn)生大量的元數(shù)據(jù)開銷。Spark 和 HDFS 一樣,都不能很好的處理這個問題,這被稱為“small file problem”。同時數(shù)據(jù)文件也不能過大,否則在查詢時會有不必要的性能開銷,因此要把文件大小控制在一個合理的范圍內。

在上文我們已經(jīng)介紹過可以通過分區(qū)數(shù)量來控制生成文件的數(shù)量,從而間接控制文件大小。Spark 2.2 引入了一種新的方法,以更自動化的方式控制文件大小,這就是 maxRecordsPerFile 參數(shù),它允許你通過控制寫入文件的記錄數(shù)來控制文件大小。

 // Spark 將確保文件最多包含 5000 條記錄
df.write.option(“maxRecordsPerFile”, 5000)

<br>

九、可選配置附錄

9.1 CSV讀寫可選配置

讀\寫操作配置項可選值默認值描述
Both seq 任意字符 ,(逗號) 分隔符
Both header true, false false 文件中的第一行是否為列的名稱。
Read escape 任意字符 \ 轉義字符
Read inferSchema true, false false 是否自動推斷列類型
Read ignoreLeadingWhiteSpace true, false false 是否跳過值前面的空格
Both ignoreTrailingWhiteSpace true, false false 是否跳過值后面的空格
Both nullValue 任意字符 “” 聲明文件中哪個字符表示空值
Both nanValue 任意字符 NaN 聲明哪個值表示 NaN 或者缺省值
Both positiveInf 任意字符 Inf 正無窮
Both negativeInf 任意字符 -Inf 負無窮
Both compression or codec None,<br/>uncompressed,<br/>bzip2, deflate,<br/>gzip, lz4, or<br/>snappy none 文件壓縮格式
Both dateFormat 任何能轉換為 Java 的 <br/>SimpleDataFormat 的字符串 yyyy-MM-dd 日期格式
Both timestampFormat 任何能轉換為 Java 的 <br/>SimpleDataFormat 的字符串 yyyy-MMdd’T’HH:mm:ss.SSSZZ 時間戳格式
Read maxColumns 任意整數(shù) 20480 聲明文件中的最大列數(shù)
Read maxCharsPerColumn 任意整數(shù) 1000000 聲明一個列中的最大字符數(shù)。
Read escapeQuotes true, false true 是否應該轉義行中的引號。
Read maxMalformedLogPerPartition 任意整數(shù) 10 聲明每個分區(qū)中最多允許多少條格式錯誤的數(shù)據(jù),超過這個值后格式錯誤的數(shù)據(jù)將不會被讀取
Write quoteAll true, false false 指定是否應該將所有值都括在引號中,而不只是轉義具有引號字符的值。
Read multiLine true, false false 是否允許每條完整記錄跨域多行

9.2 JSON讀寫可選配置

讀\寫操作配置項可選值默認值
Both compression or codec None,<br/>uncompressed,<br/>bzip2, deflate,<br/>gzip, lz4, or<br/>snappy none
Both dateFormat 任何能轉換為 Java 的 SimpleDataFormat 的字符串 yyyy-MM-dd
Both timestampFormat 任何能轉換為 Java 的 SimpleDataFormat 的字符串 yyyy-MMdd’T’HH:mm:ss.SSSZZ
Read primitiveAsString true, false false
Read allowComments true, false false
Read allowUnquotedFieldNames true, false false
Read allowSingleQuotes true, false true
Read allowNumericLeadingZeros true, false false
Read allowBackslashEscapingAnyCharacter true, false false
Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
Read multiLine true, false false

9.3 數(shù)據(jù)庫讀寫可選配置

屬性名稱含義
url 數(shù)據(jù)庫地址
dbtable 表名稱
driver 數(shù)據(jù)庫驅動
partitionColumn,<br/>lowerBound, upperBoun 分區(qū)總數(shù),上界,下界
numPartitions 可用于表讀寫并行性的最大分區(qū)數(shù)。如果要寫的分區(qū)數(shù)量超過這個限制,那么可以調用 coalesce(numpartition) 重置分區(qū)數(shù)。
fetchsize 每次往返要獲取多少行數(shù)據(jù)。此選項僅適用于讀取數(shù)據(jù)。
batchsize 每次往返插入多少行數(shù)據(jù),這個選項只適用于寫入數(shù)據(jù)。默認值是 1000。
isolationLevel 事務隔離級別:可以是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即標準事務隔離級別。<br/>默認值是 READ_UNCOMMITTED。這個選項只適用于數(shù)據(jù)讀取。
createTableOptions 寫入數(shù)據(jù)時自定義創(chuàng)建表的相關配置
createTableColumnTypes 寫入數(shù)據(jù)時自定義創(chuàng)建列的列類型

數(shù)據(jù)庫讀寫更多配置可以參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

參考資料

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
  2. https://spark.apache.org/docs/latest/sql-data-sources.html

更多大數(shù)據(jù)系列文章可以參見 GitHub 開源項目大數(shù)據(jù)入門指南

文章題目:Spark系列(十)——SparkSQL外部數(shù)據(jù)源
網(wǎng)頁網(wǎng)址:http://www.muchs.cn/article36/jpjgsg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供域名注冊、響應式網(wǎng)站、品牌網(wǎng)站設計、商城網(wǎng)站、App設計標簽優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作