這篇文章給大家介紹日志服務(wù)Python消費組如何進行實時分發(fā)數(shù)據(jù),內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
創(chuàng)新互聯(lián)于2013年創(chuàng)立,公司自成立以來始終致力于為企業(yè)提供官網(wǎng)建設(shè)、移動互聯(lián)網(wǎng)業(yè)務(wù)開發(fā)(小程序定制開發(fā)、手機網(wǎng)站建設(shè)、重慶APP開發(fā)公司等),并且包含互聯(lián)網(wǎng)基礎(chǔ)服務(wù)(域名、主機服務(wù)、企業(yè)郵箱、網(wǎng)絡(luò)營銷等)應(yīng)用服務(wù);以先進完善的建站體系及不斷開拓創(chuàng)新的精神理念,幫助企業(yè)客戶實現(xiàn)互聯(lián)網(wǎng)業(yè)務(wù),嚴(yán)格把控項目進度與質(zhì)量監(jiān)控加上過硬的技術(shù)實力獲得客戶的一致贊譽。
使用日志服務(wù)的Web-tracking、logtail(文件極簡)、syslog等收集上來的日志經(jīng)常存在各種各樣的格式,我們需要針對特定的日志(例如topic)進行一定的分發(fā)到特定的logstore中處理和索引,本文主要介紹如何使用消費組實時分發(fā)日志到不通的目標(biāo)日志庫中。并且利用消費組的特定,達到自動平衡、負載均衡和高可用性。
cdn.com/cdd946a301ff55389b6f03559de059b6dc3b4b81.png">
協(xié)同消費庫(Consumer Library)是對日志服務(wù)中日志進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數(shù)據(jù)讀取的區(qū)別在于,用戶無需關(guān)心日志服務(wù)的實現(xiàn)細節(jié),只需要專注于業(yè)務(wù)邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關(guān)心。
消費組(Consumer Group)- 一個消費組由多個消費者構(gòu)成,同一個消費組下面的消費者共同消費一個logstore中的數(shù)據(jù),消費者之間不會重復(fù)消費數(shù)據(jù)。
消費者(Consumer)- 消費組的構(gòu)成單元,實際承擔(dān)消費任務(wù),同一個消費組下面的消費者名稱必須不同。
在日志服務(wù)中,一個logstore下面會有多個shard,協(xié)同消費庫的功能就是將shard分配給一個消費組下面的消費者,分配方式遵循以下原則:
每個shard只會分配到一個消費者。
一個消費者可以同時擁有多個shard。
新的消費者加入一個消費組,這個消費組下面的shard從屬關(guān)系會調(diào)整,以達到消費負載均衡的目的,但是上面的分配原則不會變,分配過程對用戶透明。
協(xié)同消費庫的另一個功能是保存checkpoint,方便程序故障恢復(fù)時能接著從斷點繼續(xù)消費,從而保證數(shù)據(jù)不會被重復(fù)消費。
這里我們描述用Python使用消費組進行編程,實時根據(jù)數(shù)據(jù)的topic進行分發(fā)。
注意:本篇文章的相關(guān)代碼可能會更新,最新版本在這里可以找到:Github樣例.
環(huán)境
建議程序運行在源日志庫同Region下的ECS上,并使用局域網(wǎng)服務(wù)入口,這樣好處是網(wǎng)絡(luò)速度最快,其次是讀取沒有外網(wǎng)費用產(chǎn)生。
強烈推薦PyPy3來運行本程序,而不是使用標(biāo)準(zhǔn)CPython解釋器。
日志服務(wù)的Python SDK可以如下安裝:
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手冊,可以參考這里
如下展示如何配置程序:
配置程序日志文件,以便后續(xù)測試或者診斷可能的問題(跳過,具體參考樣例)。
基本的日志服務(wù)連接與消費組的配置選項。
目標(biāo)Logstore的一些連接信息
請仔細閱讀代碼中相關(guān)注釋并根據(jù)需要調(diào)整選項:
#encoding: utf8 def get_option(): ########################## # 基本選項 ########################## # 從環(huán)境變量中加載SLS參數(shù)與選項,根據(jù)需要可以配置多個目標(biāo) accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') endpoint = os.environ.get('SLS_ENDPOINT', '') project = os.environ.get('SLS_PROJECT', '') logstore = os.environ.get('SLS_LOGSTORE', '') to_endpoint = os.environ.get('SLS_ENDPOINT_TO', endpoint) to_project = os.environ.get('SLS_PROJECT_TO', project) to_logstore1 = os.environ.get('SLS_LOGSTORE_TO1', '') to_logstore2 = os.environ.get('SLS_LOGSTORE_TO2', '') to_logstore3 = os.environ.get('SLS_LOGSTORE_TO3', '') consumer_group = os.environ.get('SLS_CG', '') # 消費的起點。這個參數(shù)在第一次跑程序的時候有效,后續(xù)再次運行將從上一次消費的保存點繼續(xù)。 # 可以使”begin“,”end“,或者特定的ISO時間格式。 cursor_start_time = "2018-12-26 0:0:0" # 一般不要修改消費者名,尤其是需要并發(fā)跑時 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 構(gòu)建一個消費組和消費者 option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time) # bind put_log_raw which is faster to_client = LogClient(to_endpoint, accessKeyId, accessKey) put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1) put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2) put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3) return option, {u'ngnix': put_method1, u'sql_audit': put_method2, u'click': put_method3}
注意,這里使用了functools.partial
對put_log_raw
進行綁定,以便后續(xù)調(diào)用方便。
如下代碼展示如何從SLS拿到數(shù)據(jù)后根據(jù)topic
進行轉(zhuǎn)發(fā)。
if __name__ == '__main__': option, put_methods = get_copy_option() def copy_data(shard_id, log_groups): for log_group in log_groups.LogGroups: # update topic if log_group.Topic in put_methods: put_methods[log_group.Topic](log_group=log_group) logger.info("*** start to consume data...") worker = ConsumerWorker(ConsumerProcessorAdaptor, option, args=(copy_data, )) worker.start(join=True)
假設(shè)程序命名為"dispatch_data.py",可以如下啟動:
export SLS_ENDPOINT=<Endpoint of your region> export SLS_AK_ID=<YOUR AK ID> export SLS_AK_KEY=<YOUR AK KEY> export SLS_PROJECT=<SLS Project Name> export SLS_LOGSTORE=<SLS Logstore Name> export SLS_LOGSTORE_TO1=<SLS To Logstore1 Name> export SLS_LOGSTORE_TO1=<SLS To Logstore2 Name> export SLS_LOGSTORE_TO1=<SLS To Logstore3 Name> export SLS_CG=<消費組名,可以簡單命名為"dispatch_data"> pypy3 dispatch_data.py
基于消費組的程序可以直接啟動多次以便達到并發(fā)作用:
nohup pypy3 dispatch_data.py & nohup pypy3 dispatch_data.py & nohup pypy3 dispatch_data.py & ...
注意:
所有消費者使用了同一個消費組的名字和不同的消費者名字(因為消費者名以進程ID為后綴)。
因為一個分區(qū)(Shard)只能被一個消費者消費,假設(shè)一個日志庫有10個分區(qū),那么最多有10個消費者同時消費。
基于測試,在沒有帶寬限制、接收端速率限制(如Splunk端)的情況下,以推進硬件用pypy3
運行上述樣例,單個消費者占用大約10%的單核CPU
下可以消費達到5 MB/s
原始日志的速率。因此,理論上可以達到50 MB/s
原始日志每個CPU核
,也就是每個CPU核每天可以消費4TB原始日志
。
注意:這個數(shù)據(jù)依賴帶寬、硬件參數(shù)和目標(biāo)Logstore是否能夠較快接收數(shù)據(jù)。
消費組會將檢測點(check-point)保存在服務(wù)器端,當(dāng)一個消費者停止,另外一個消費者將自動接管并從斷點繼續(xù)消費。
可以在不同機器上啟動消費者,這樣當(dāng)一臺機器停止或者損壞的清下,其他機器上的消費者可以自動接管并從斷點進行消費。
理論上,為了備用,也可以啟動大于shard數(shù)量的消費者。
每一個日志庫(logstore)最多可以配置10個消費組,如果遇到錯誤ConsumerGroupQuotaExceed
則表示遇到限制,建議在控制臺端刪除一些不用的消費組。
在控制臺查看消費組狀態(tài)
通過云監(jiān)控查看消費組延遲,并配置報警
如果服務(wù)入口(endpoint)配置為https://
前綴,如https://cn-beijing.log.aliyuncs.com
,程序與SLS的連接將自動使用HTTPS加密。
服務(wù)器證書*.aliyuncs.com
是GlobalSign簽發(fā),默認(rèn)大多數(shù)Linux/Windows的機器會自動信任此證書。如果某些特殊情況,機器不信任此證書,可以參考這里下載并安裝此證書。
關(guān)于日志服務(wù)Python消費組如何進行實時分發(fā)數(shù)據(jù)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
分享標(biāo)題:日志服務(wù)Python消費組如何進行實時分發(fā)數(shù)據(jù)
本文網(wǎng)址:http://muchs.cn/article24/gdopje.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)、ChatGPT、商城網(wǎng)站、網(wǎng)站設(shè)計公司、虛擬主機、網(wǎng)站改版
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)