RDS與POLARDB歸檔到X-PackSpark計(jì)算的方法

本篇內(nèi)容介紹了“RDS與POLARDB歸檔到X-Pack Spark計(jì)算的方法”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)主營(yíng)濱城網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,重慶APP軟件開發(fā),濱城h5小程序開發(fā)搭建,濱城網(wǎng)站營(yíng)銷推廣歡迎濱城等地區(qū)企業(yè)咨詢

X-Pack Spark服務(wù)通過外部計(jì)算資源的方式,為redis、Cassandra、MongoDB、HBase、RDS存儲(chǔ)服務(wù)提供復(fù)雜分析、流式處理及入庫、機(jī)器學(xué)習(xí)的能力,從而更好的解決用戶數(shù)據(jù)處理相關(guān)場(chǎng)景問題。

RDS與POLARDB歸檔到X-Pack Spark計(jì)算的方法

RDS & POLARDB分表歸檔到X-Pack Spark步驟

一鍵關(guān)聯(lián)POLARDB到Spark集群

POLARDB表存儲(chǔ)

在database ‘test1’中每5分鐘生成一張表,這里假設(shè)為表 'test1'、'test2'、'test2'、...
RDS與POLARDB歸檔到X-Pack Spark計(jì)算的方法

具體的建表語句如下:

*請(qǐng)左右滑動(dòng)閱覽

 CREATE TABLE `test1` ( `a` int(11) NOT NULL,
                        `b` time DEFAULT NULL,          
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

歸檔到Spark的調(diào)試

x-pack spark提供交互式查詢模式支持直接在控制臺(tái)提交sql、python腳本、scala code來調(diào)試。

1、首先創(chuàng)建一個(gè)交互式查詢的session,在其中添加MySQL-connector的jar包。

2、創(chuàng)建交互式查詢

以pyspark為例,下面是具體歸檔demo的代碼:

*請(qǐng)左右滑動(dòng)閱覽

spark.sql("drop table sparktest").show()
# 創(chuàng)建一張spark表,三級(jí)分區(qū),分別是天、小時(shí)、分鐘,最后一級(jí)分鐘用來存儲(chǔ)具體的5分鐘的一張polardb表達(dá)的數(shù)據(jù)。字段和polardb里面的類型一致
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面創(chuàng)建了databse test1,具有三張表test1 ,test2,test3,這里遍歷這三張表,每個(gè)表存儲(chǔ)spark的一個(gè)5min的分區(qū)
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
    #構(gòu)造polardb的表名
    dbtable = "test1." + "test" + str(num)
    #spark外表關(guān)聯(lián)polardb對(duì)應(yīng)的表
    externalPolarDBTableNow = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("dbtable", dbtable) \
        .option("user", "name") \
        .option("password", "xxx*") \
        .load().registerTempTable("polardbTableTemp")
    #生成本次polardb表數(shù)據(jù)要寫入的spark表的分區(qū)信息
    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
    #執(zhí)行導(dǎo)數(shù)據(jù)sql 
    spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #刪除臨時(shí)的spark映射polardb表的catalog
    spark.catalog.dropTempView("polardbTableTemp")
    #查看下分區(qū)以及統(tǒng)計(jì)下數(shù)據(jù),主要用來做測(cè)試驗(yàn)證,實(shí)際運(yùn)行過程可以刪除
    spark.sql("show partitions sparktest").show(1000, False)
    spark.sql("select count(*) from sparktest").show()

歸檔作業(yè)上生產(chǎn)

交互式查詢定位為臨時(shí)查詢及調(diào)試,生產(chǎn)的作業(yè)還是建議使用spark作業(yè)的方式運(yùn)行,使用文檔參考。這里以pyspark作業(yè)為例:
RDS與POLARDB歸檔到X-Pack Spark計(jì)算的方法

/polardb/polardbArchiving.py 內(nèi)容如下:

*請(qǐng)左右滑動(dòng)閱覽

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 創(chuàng)建一張spark表,三級(jí)分區(qū),分別是天、小時(shí)、分鐘,最后一級(jí)分鐘用來存儲(chǔ)具體的5分鐘的一張polardb表達(dá)的數(shù)據(jù)。字段和polardb里面的類型一致
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb里面創(chuàng)建了databse test1,具有三張表test1 ,test2,test3,這里遍歷這三張表,每個(gè)表存儲(chǔ)spark的一個(gè)5min的分區(qū)
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(1, 4):
        #構(gòu)造polardb的表名
        dbtable = "test1.">

“RDS與POLARDB歸檔到X-Pack Spark計(jì)算的方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

新聞名稱:RDS與POLARDB歸檔到X-PackSpark計(jì)算的方法
文章轉(zhuǎn)載:http://muchs.cn/article22/pidjcc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷推廣電子商務(wù)、搜索引擎優(yōu)化、標(biāo)簽優(yōu)化、移動(dòng)網(wǎng)站建設(shè)外貿(mào)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)