在ApacheSpark中使用UDF

用戶自定義函數(shù)(UDF)是大多數(shù)SQL環(huán)境的一個(gè)關(guān)鍵特性,其主要用于擴(kuò)展系統(tǒng)的內(nèi)置功能。UDF允許開發(fā)人員通過(guò)抽象其低級(jí)語(yǔ)言實(shí)現(xiàn)在更高級(jí)語(yǔ)言(如SQL)中應(yīng)用的新函數(shù)。Apache Spark也不例外,其為UDF與Spark SQL工作流集成提供了各種選項(xiàng)。

創(chuàng)新互聯(lián)是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設(shè)公司,自成立以來(lái)公司不斷探索創(chuàng)新,始終堅(jiān)持為客戶提供滿意周到的服務(wù),在本地打下了良好的口碑,在過(guò)去的十載時(shí)間我們累計(jì)服務(wù)了上千家以及全國(guó)政企客戶,如混凝土泵車等企業(yè)單位,完善的項(xiàng)目管理流程,嚴(yán)格把控項(xiàng)目進(jìn)度與質(zhì)量監(jiān)控加上過(guò)硬的技術(shù)實(shí)力獲得客戶的一致贊揚(yáng)。

在本篇博文中,我們將回顧Python、Java和Scala上的Apache Spark UDF和UDAF(用戶自定義的聚合函數(shù))實(shí)現(xiàn)的簡(jiǎn)單示例。我們還將討論重要的UDF API功能和集成點(diǎn),包括各發(fā)行版本之間的當(dāng)前可用性??偠灾?,我們將介紹一些重要的性能注意事項(xiàng),使您對(duì)應(yīng)用程序中利用UDF的選擇有所了解。

Spark SQL UDFs

UDF轉(zhuǎn)換了表中單個(gè)行的數(shù)值,為每行生成單個(gè)對(duì)應(yīng)的輸出值。例如,大多數(shù)的SQL環(huán)境都提供了一個(gè)UPPER函數(shù),同時(shí)返回了一個(gè)大寫版本的字符串以作為輸入。

自定義函數(shù)可以在Spark SQL中定義和注冊(cè)為UDF,并具有可用于SQL查詢的關(guān)聯(lián)別名。下面我們將為您介紹一個(gè)簡(jiǎn)單的例子,我們將定義一個(gè)UDF將以下JSON數(shù)據(jù)中的溫度值從攝氏度(Celsius)轉(zhuǎn)換為華氏度(Fahrenheit):
在Apache Spark中使用UDF
下面的示例代碼使用SQL別名CTOF注冊(cè)我們的轉(zhuǎn)換UDF,然后使用它從SQL查詢中轉(zhuǎn)換每個(gè)城市的溫度值。為簡(jiǎn)潔起見,省略了SQLContext對(duì)象和其他樣板代碼的創(chuàng)建,并在每個(gè)代碼段下面提供了完整列表的鏈接。

Python
在Apache Spark中使用UDF
Scala
在Apache Spark中使用UDF
Java
在Apache Spark中使用UDF
請(qǐng)注意,Spark SQL定義了UDF1~UDF22類別,支持包含最多22個(gè)輸入?yún)?shù)的UDF。上面的例子中使用UDF1處理單個(gè)溫度值作為輸入。如果未能對(duì)Apache Spark源代碼進(jìn)行更新,使用數(shù)組(arrays)或結(jié)構(gòu)體(structs)作為參數(shù)對(duì)于需要超過(guò)22個(gè)輸入的應(yīng)用程序可能很有幫助;從風(fēng)格的角度來(lái)看,如果您發(fā)現(xiàn)自己使用的是UDF6或更高版本,這一方案可能是首選。

Spark SQL UDAF函數(shù)

用戶自定義聚合函數(shù)(UDAF)可以同時(shí)處理多行,然后返回單個(gè)值作為結(jié)果,其通常與GROUP BY語(yǔ)句(例如COUNT或SUM)共同使用。為了讓示例簡(jiǎn)單明了,我們將實(shí)現(xiàn)一個(gè)別名為SUMPRODUCT的UDAF按使用分組、給定價(jià)格和庫(kù)存中的整數(shù)數(shù)量計(jì)算所有車輛的零售價(jià)值:
在Apache Spark中使用UDF
目前,Apache Spark UDAF的實(shí)現(xiàn)定義在擴(kuò)展繼承的了UserDefinedAggregateFunction類別中并有由Scala和Java語(yǔ)法支持。一旦定義好之后,我們可以在別名SUMPRODUCT下舉例說(shuō)明并注冊(cè)我們的SumProductAggregateFunction UDAF對(duì)象,并從SQL查詢中予以使用,這與前面示例中的CTOF UDF大致相同。

Scala
在Apache Spark中使用UDF
Apache Spark中的其他UDF支持

Spark SQL支持UDF、UDAF和UDTF等現(xiàn)有Hive(Java或Scala)函數(shù)實(shí)現(xiàn)的集成。順便提醒一下,UDTFs(用戶自定義表函數(shù))可以返回多個(gè)列和行 – 這超出了本文的范圍,但是我們可能在以后的博文中涉及。對(duì)于使用前面示例中強(qiáng)調(diào)的方法重新實(shí)現(xiàn)和注冊(cè)相同邏輯,集成現(xiàn)有的Hive UDF是有價(jià)值的一種替代方法;從性能角度來(lái)看,該方法對(duì)于PySpark也是有幫助的,這將在下一節(jié)中討論。通過(guò)包含Hive UDF函數(shù)實(shí)現(xiàn)的JAR文件利用spark-submit的-jars選項(xiàng),可以從HiveContext中訪問(wèn)Hive函數(shù);然后使用CREATE TEMPORARY FUNCTION對(duì)函數(shù)進(jìn)行聲明(如在Hive [1]中所做,包含一個(gè)UDF),具體示例如下所述:

Java 中的Hive UDF定義
在Apache Spark中使用UDF
從Python訪問(wèn)HiveUDF
在Apache Spark中使用UDF
請(qǐng)注意,如上文中我們實(shí)現(xiàn)的UDF和UDAF函數(shù),Hive UDF只能使用Apache Spark的SQL查詢語(yǔ)言進(jìn)行調(diào)用 – 也就是說(shuō),不能與Dataframe API的域特定語(yǔ)言(DSL)一起使用。

或者,通過(guò)包含實(shí)現(xiàn)jar文件(使用含有spark-submit的-jars選項(xiàng)),以Scala和Java語(yǔ)言實(shí)現(xiàn)的UDF可以從PySpark中進(jìn)行訪問(wèn),然后通過(guò)SparkContext對(duì)象的私有引用執(zhí)行器JVM、底層Scala或裝載在jar文件中的Java UDF實(shí)現(xiàn)來(lái)訪問(wèn)UDF定義。Holden Karau在一次精彩的演講中[2]對(duì)這種方法進(jìn)行了探討。請(qǐng)注意,在此技術(shù)中所使用的一些Apache Spark私有變量不是正式面向終端用戶的。這樣做還帶來(lái)了額外的好處,允許將UDAF(目前必須在Java和Scala中定義)用于PySpark,下文中的示例中使用了前面在Scala中定義的SUMPRODUCT UDAF進(jìn)行證明:

Scala UDAF定義
在Apache Spark中使用UDF
Scala UDAF from PySpark
在Apache Spark中使用UDF
UDF相關(guān)的功能正在連續(xù)不斷地添加至Apache Spark的每一個(gè)版本中。例如2.0版本在R中增加了對(duì)UDF的支持。作為參考,下表總結(jié)了本文中討論的各版本的關(guān)鍵特性:
在Apache Spark中使用UDF
在Apache Spark中使用UDF
在Apache Spark中使用UDF
表格中匯總了目前為止本博客中介紹過(guò)的相關(guān)版本的關(guān)鍵特性。

性能注意事項(xiàng)

了解Apache Spark UDF功能的性能影響是非常重要的。例如,Python UDF(比如我們的CTOF函數(shù))導(dǎo)致數(shù)據(jù)在運(yùn)行UDF邏輯的執(zhí)行器JVM和Python注釋器之間被序列化 - 與Java或Scala中的UDF實(shí)現(xiàn)相比,這大大降低了性能。緩解這種序列化瓶頸的潛在解決方案包括以下方面:

  1. 如上一節(jié)所述,從PySpark中訪問(wèn)Hive UDF。Java UDF實(shí)現(xiàn)可以由執(zhí)行器JVM直接訪問(wèn)。請(qǐng)?jiān)俅巫⒁猓@種方法只用于從Apache Spark的SQL查詢語(yǔ)言訪問(wèn)UDF。
  2. 這種方法的使用也可以參考PySpark訪問(wèn)在Java或Scala中執(zhí)行的UDF,如我們之前定義的Scala UDAF示例所示。

一般來(lái)說(shuō),UDF邏輯應(yīng)盡可能的精簡(jiǎn),因?yàn)榭赡苊恳恍卸紩?huì)被調(diào)用。例如,在擴(kuò)展到10億行時(shí),UDF邏輯中的一個(gè)步驟需要耗費(fèi)100毫秒的時(shí)間才能完成,從而很快就會(huì)導(dǎo)致重大的要性能問(wèn)題。

Spark SQL的另一個(gè)重要組成部分是Catalyst查詢優(yōu)化器。這一功能隨著每個(gè)版本而擴(kuò)展,通常可以為Spark SQL查詢提供顯著的性能改進(jìn);然而,任意UDF實(shí)現(xiàn)代碼對(duì)于Catalyst而言可能不是很好理解(雖然分析字節(jié)碼的未來(lái)功能[3]被認(rèn)為可以解決這一問(wèn)題)。因此,使用Apache Spark內(nèi)置SQL查詢函數(shù)功能通??梢詭?lái)最佳性能,并且應(yīng)該是在避免引入U(xiǎn)DF時(shí)考慮的第一種方法。高級(jí)用戶尋求利用Catalyst與其代碼更緊密地結(jié)合,可以參考以下Chris Fregly的演講[4],該演講人使用Expression.genCode優(yōu)化UDF代碼,并且使用了新的Apache Spark 2.0實(shí)驗(yàn)功能[5],其為定制Catalyst優(yōu)化程序規(guī)則提供了一個(gè)可即插即用的API。

結(jié)論

當(dāng)Spark SQL的內(nèi)置功能需要擴(kuò)展時(shí),UDF是一個(gè)非常有用的工具。本篇博文中提供了一次UDF和UDAF實(shí)現(xiàn)的演練,并討論了其集成步驟,以在Spark SQL中利用Spark SQL中現(xiàn)有的Java Hive UDF。UDF可以在Python、Scala、Java和(在Spark 2.0中)R中實(shí)現(xiàn),同時(shí)UDAF 可以在以及Scala和Java的UDAF中實(shí)現(xiàn)。當(dāng)在PySpark中使用UDF時(shí),必須考慮數(shù)據(jù)序列化成本,并且應(yīng)該考慮采用上文所討論的兩個(gè)策略來(lái)解決這個(gè)問(wèn)題。最后,我們探討了Spark SQL的Catalyst優(yōu)化器,以及基于性能考慮的因素,在解決方案中引入U(xiǎn)DF之前堅(jiān)持使用內(nèi)置SQL函數(shù)的性能考慮因素。

代碼https://github.com/curtishoward/sparkudfexamples
CDH版本:5.8.0(Apache Spark 1.6.0)

分享題目:在ApacheSpark中使用UDF
分享地址:http://muchs.cn/article4/pisdie.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、Google用戶體驗(yàn)、搜索引擎優(yōu)化、App設(shè)計(jì)網(wǎng)站導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司