Spark 支持以下六個核心數(shù)據(jù)源,同時 Spark 社區(qū)還提供了多達上百種數(shù)據(jù)源的讀取方式,能夠滿足絕大部分使用場景。
成都網(wǎng)站制作、網(wǎng)站設計、外貿(mào)網(wǎng)站建設的開發(fā),更需要了解用戶,從用戶角度來建設網(wǎng)站,獲得較好的用戶體驗。創(chuàng)新互聯(lián)多年互聯(lián)網(wǎng)經(jīng)驗,見的多,溝通容易、能幫助客戶提出的運營建議。作為成都一家網(wǎng)絡公司,打造的就是網(wǎng)站建設產(chǎn)品直銷的概念。選擇創(chuàng)新互聯(lián),不只是建站,我們把建站作為產(chǎn)品,不斷的更新、完善,讓每位來訪用戶感受到浩方產(chǎn)品的價值服務。
注:以下所有測試文件均可從本倉庫的resources 目錄進行下載
所有讀取 API 遵循以下調用格式:
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()
// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 讀取模式
.option("inferSchema", "true") // 是否自動推斷 schema
.option("path", "path/to/file(s)") // 文件路徑
.schema(someSchema) // 使用預定義的 schema
.load()
讀取模式有以下三種可選項:
讀模式 | 描述 |
---|---|
permissive |
當遇到損壞的記錄時,將其所有字段設置為 null,并將所有損壞的記錄放在名為 _corruption t_record 的字符串列中 |
dropMalformed |
刪除格式不正確的行 |
failFast |
遇到格式不正確的數(shù)據(jù)時立即失敗 |
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //寫模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
寫數(shù)據(jù)模式有以下四種可選項:
Scala/Java | 描述 |
---|---|
SaveMode.ErrorIfExists |
如果給定的路徑已經(jīng)存在文件,則拋出異常,這是寫數(shù)據(jù)默認的模式 |
SaveMode.Append |
數(shù)據(jù)以追加的方式寫入 |
SaveMode.Overwrite |
數(shù)據(jù)以覆蓋的方式寫入 |
SaveMode.Ignore |
如果給定的路徑已經(jīng)存在文件,則不做任何操作 |
<br/>
CSV 是一種常見的文本文件格式,其中每一行表示一條記錄,記錄中的每個字段用逗號分隔。
自動推斷類型讀取讀取示例:
spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否為列的名稱
.option("mode", "FAILFAST") // 是否快速失敗
.option("inferSchema", "true") // 是否自動推斷 schema
.load("/usr/file/csv/dept.csv")
.show()
使用預定義類型:
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//預定義數(shù)據(jù)格式
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
也可以指定具體的分隔符:
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
為節(jié)省主文篇幅,所有讀寫配置項見文末 9.1 小節(jié)。
<br/>
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
需要注意的是:默認不支持一條數(shù)據(jù)記錄跨越多行 (如下),可以通過配置 multiLine
為 true
來進行更改,其默認值為 false
。
// 默認支持單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//默認不支持多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
為節(jié)省主文篇幅,所有讀寫配置項見文末 9.2 小節(jié)。
<br/>
Parquet 是一個開源的面向列的數(shù)據(jù)存儲,它提供了多種存儲優(yōu)化,允許讀取單獨的列非整個文件,這不僅節(jié)省了存儲空間而且提升了讀取效率,它是 Spark 是默認的文件格式。
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
Parquet 文件有著自己的存儲規(guī)則,因此其可選配置項比較少,常用的有如下兩個:
讀寫操作 | 配置項 | 可選值 | 默認值 | 描述 |
---|---|---|---|---|
Write | compression or codec | None,<br/>uncompressed,<br/>bzip2,<br/>deflate, gzip,<br/>lz4, or snappy | None | 壓縮文件格式 |
Read | mergeSchema | true, false | 取決于配置項 spark.sql.parquet.mergeSchema |
當為真時,Parquet 數(shù)據(jù)源將所有數(shù)據(jù)文件收集的 Schema 合并在一起,否則將從摘要文件中選擇 Schema,如果沒有可用的摘要文件,則從隨機數(shù)據(jù)文件中選擇 Schema。 |
更多可選配置可以參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
<br/>
ORC 是一種自描述的、類型感知的列文件格式,它針對大型數(shù)據(jù)的讀寫進行了優(yōu)化,也是大數(shù)據(jù)中常用的文件格式。
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
<br/>
Spark 同樣支持與傳統(tǒng)的關系型數(shù)據(jù)庫進行數(shù)據(jù)讀寫。但是 Spark 程序默認是沒有提供數(shù)據(jù)庫驅動的,所以在使用前需要將對應的數(shù)據(jù)庫驅動上傳到安裝目錄下的 jars
目錄中。下面示例使用的是 MySQL 數(shù)據(jù)庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar
上傳到 jars
目錄下。
讀取全表數(shù)據(jù)示例如下,這里的 help_keyword
是 mysql 內置的字典表,只有 help_keyword_id
和 name
兩個字段。
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驅動
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //數(shù)據(jù)庫地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)
從查詢結果讀取數(shù)據(jù):
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//輸出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 19| ASTEXT|
+---------------+-----------+
也可以使用如下的寫法進行數(shù)據(jù)的過濾:
val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定數(shù)據(jù)過濾條件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show()
//輸出:
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 604| WHEN|
+---------------+-----------+
可以使用 numPartitions
指定讀取數(shù)據(jù)的并行度:
option("numPartitions", 10)
在這里,除了可以指定分區(qū)外,還可以設置上界和下界,任何小于下界的值都會被分配在第一個分區(qū)中,任何大于上界的值都會被分配在最后一個分區(qū)中。
val colName = "help_keyword_id" //用于判斷上下界的列
val lowerBound = 300L //下界
val upperBound = 500L //上界
val numPartitions = 10 //分區(qū)綜述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
colName,lowerBound,upperBound,numPartitions,props)
想要驗證分區(qū)內容,可以使用 mapPartitionsWithIndex
這個算子,代碼如下:
jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分區(qū):" + iterator.next())
}
buffer.toIterator
}).foreach(println)
執(zhí)行結果如下:help_keyword
這張表只有 600 條左右的數(shù)據(jù),本來數(shù)據(jù)應該均勻分布在 10 個分區(qū),但是 0 分區(qū)里面卻有 319 條數(shù)據(jù),這是因為設置了下限,所有小于 300 的數(shù)據(jù)都會被限制在第一個分區(qū),即 0 分區(qū)。同理所有大于 500 的數(shù)據(jù)被分配在 9 分區(qū),即最后一個分區(qū)。
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()
<br/>
Text 文件在讀寫性能方面并沒有任何優(yōu)勢,且不能表達明確的數(shù)據(jù)結構,所以其使用的比較少,讀寫操作如下:
spark.read.textFile("/usr/file/txt/dept.txt").show()
df.write.text("/tmp/spark/txt/dept")
<br/>
多個 Executors 不能同時讀取同一個文件,但它們可以同時讀取不同的文件。這意味著當您從一個包含多個文件的文件夾中讀取數(shù)據(jù)時,這些文件中的每一個都將成為 DataFrame 中的一個分區(qū),并由可用的 Executors 并行讀取。
寫入的文件或數(shù)據(jù)的數(shù)量取決于寫入數(shù)據(jù)時 DataFrame 擁有的分區(qū)數(shù)量。默認情況下,每個數(shù)據(jù)分區(qū)寫一個文件。
分區(qū)和分桶這兩個概念和 Hive 中分區(qū)表和分桶表是一致的。都是將數(shù)據(jù)按照一定規(guī)則進行拆分存儲。需要注意的是 partitionBy
指定的分區(qū)和 RDD 中分區(qū)不是一個概念:這里的分區(qū)表現(xiàn)為輸出目錄的子目錄,數(shù)據(jù)分別存儲在對應的子目錄中。
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出文件。
分桶寫入就是將數(shù)據(jù)按照指定的列和桶數(shù)進行散列,目前分桶寫入只支持保存為表,實際上這就是 Hive 的分桶表。
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
如果寫入產(chǎn)生小文件數(shù)量過多,這時會產(chǎn)生大量的元數(shù)據(jù)開銷。Spark 和 HDFS 一樣,都不能很好的處理這個問題,這被稱為“small file problem”。同時數(shù)據(jù)文件也不能過大,否則在查詢時會有不必要的性能開銷,因此要把文件大小控制在一個合理的范圍內。
在上文我們已經(jīng)介紹過可以通過分區(qū)數(shù)量來控制生成文件的數(shù)量,從而間接控制文件大小。Spark 2.2 引入了一種新的方法,以更自動化的方式控制文件大小,這就是 maxRecordsPerFile
參數(shù),它允許你通過控制寫入文件的記錄數(shù)來控制文件大小。
// Spark 將確保文件最多包含 5000 條記錄
df.write.option(“maxRecordsPerFile”, 5000)
<br>
讀\寫操作 | 配置項 | 可選值 | 默認值 | 描述 |
---|---|---|---|---|
Both | seq | 任意字符 | , (逗號) |
分隔符 |
Both | header | true, false | false | 文件中的第一行是否為列的名稱。 |
Read | escape | 任意字符 | \ | 轉義字符 |
Read | inferSchema | true, false | false | 是否自動推斷列類型 |
Read | ignoreLeadingWhiteSpace | true, false | false | 是否跳過值前面的空格 |
Both | ignoreTrailingWhiteSpace | true, false | false | 是否跳過值后面的空格 |
Both | nullValue | 任意字符 | “” | 聲明文件中哪個字符表示空值 |
Both | nanValue | 任意字符 | NaN | 聲明哪個值表示 NaN 或者缺省值 |
Both | positiveInf | 任意字符 | Inf | 正無窮 |
Both | negativeInf | 任意字符 | -Inf | 負無窮 |
Both | compression or codec | None,<br/>uncompressed,<br/>bzip2, deflate,<br/>gzip, lz4, or<br/>snappy | none | 文件壓縮格式 |
Both | dateFormat | 任何能轉換為 Java 的 <br/>SimpleDataFormat 的字符串 | yyyy-MM-dd | 日期格式 |
Both | timestampFormat | 任何能轉換為 Java 的 <br/>SimpleDataFormat 的字符串 | yyyy-MMdd’T’HH:mm:ss.SSSZZ | 時間戳格式 |
Read | maxColumns | 任意整數(shù) | 20480 | 聲明文件中的最大列數(shù) |
Read | maxCharsPerColumn | 任意整數(shù) | 1000000 | 聲明一個列中的最大字符數(shù)。 |
Read | escapeQuotes | true, false | true | 是否應該轉義行中的引號。 |
Read | maxMalformedLogPerPartition | 任意整數(shù) | 10 | 聲明每個分區(qū)中最多允許多少條格式錯誤的數(shù)據(jù),超過這個值后格式錯誤的數(shù)據(jù)將不會被讀取 |
Write | quoteAll | true, false | false | 指定是否應該將所有值都括在引號中,而不只是轉義具有引號字符的值。 |
Read | multiLine | true, false | false | 是否允許每條完整記錄跨域多行 |
讀\寫操作 | 配置項 | 可選值 | 默認值 |
---|---|---|---|
Both | compression or codec | None,<br/>uncompressed,<br/>bzip2, deflate,<br/>gzip, lz4, or<br/>snappy | none |
Both | dateFormat | 任何能轉換為 Java 的 SimpleDataFormat 的字符串 | yyyy-MM-dd |
Both | timestampFormat | 任何能轉換為 Java 的 SimpleDataFormat 的字符串 | yyyy-MMdd’T’HH:mm:ss.SSSZZ |
Read | primitiveAsString | true, false | false |
Read | allowComments | true, false | false |
Read | allowUnquotedFieldNames | true, false | false |
Read | allowSingleQuotes | true, false | true |
Read | allowNumericLeadingZeros | true, false | false |
Read | allowBackslashEscapingAnyCharacter | true, false | false |
Read | columnNameOfCorruptRecord | true, false | Value of spark.sql.column&NameOf |
Read | multiLine | true, false | false |
屬性名稱 | 含義 |
---|---|
url | 數(shù)據(jù)庫地址 |
dbtable | 表名稱 |
driver | 數(shù)據(jù)庫驅動 |
partitionColumn,<br/>lowerBound, upperBoun | 分區(qū)總數(shù),上界,下界 |
numPartitions | 可用于表讀寫并行性的最大分區(qū)數(shù)。如果要寫的分區(qū)數(shù)量超過這個限制,那么可以調用 coalesce(numpartition) 重置分區(qū)數(shù)。 |
fetchsize | 每次往返要獲取多少行數(shù)據(jù)。此選項僅適用于讀取數(shù)據(jù)。 |
batchsize | 每次往返插入多少行數(shù)據(jù),這個選項只適用于寫入數(shù)據(jù)。默認值是 1000。 |
isolationLevel | 事務隔離級別:可以是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即標準事務隔離級別。<br/>默認值是 READ_UNCOMMITTED。這個選項只適用于數(shù)據(jù)讀取。 |
createTableOptions | 寫入數(shù)據(jù)時自定義創(chuàng)建表的相關配置 |
createTableColumnTypes | 寫入數(shù)據(jù)時自定義創(chuàng)建列的列類型 |
數(shù)據(jù)庫讀寫更多配置可以參閱官方文檔:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
更多大數(shù)據(jù)系列文章可以參見 GitHub 開源項目: 大數(shù)據(jù)入門指南
文章題目:Spark系列(十)——SparkSQL外部數(shù)據(jù)源
網(wǎng)頁網(wǎng)址:http://www.muchs.cn/article36/jpjgsg.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供域名注冊、響應式網(wǎng)站、品牌網(wǎng)站設計、商城網(wǎng)站、App設計、標簽優(yōu)化
聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)