本篇內(nèi)容主要講解“TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么”吧!
10年積累的成都網(wǎng)站制作、網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站制作后付款的網(wǎng)站建設(shè)流程,更有水城免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
# 指定配置文件中涉及的庫名、表名是否為大小寫敏感 # 該配置會(huì)同時(shí)影響 filter 和 sink 相關(guān)配置,默認(rèn)為 true case-sensitive = true # 是否輸出 old value,從 v4.0.5 開始支持 enable-old-value = true [filter] # 忽略指定 start_ts 的事務(wù) ignore-txn-start-ts = [1, 2] # 過濾器規(guī)則 # 過濾規(guī)則語法:https://docs.pingcap.com/zh/tidb/stable/table-filter#表庫過濾語法 指定了我的銷售表 rules = ['dspdev.sales_order_header'] [mounter] # mounter 線程數(shù),用于解碼 TiKV 輸出的數(shù)據(jù) worker-num = 16 [sink] # 對(duì)于 MQ 類的 Sink,可以通過 dispatchers 配置 event 分發(fā)器 # 支持 default、ts、rowid、table 四種分發(fā)器,分發(fā)規(guī)則如下: # - default:有多個(gè)唯一索引(包括主鍵)時(shí)按照 table 模式分發(fā);只有一個(gè)唯一索引(或主鍵)按照 rowid 模式分發(fā);如果開啟了 old value 特性,按照 table 分發(fā) # - ts:以行變更的 commitTs 做 Hash 計(jì)算并進(jìn)行 event 分發(fā) # - rowid:以所選的 HandleKey 列名和列值做 Hash 計(jì)算并進(jìn)行 event 分發(fā) # - table:以表的 schema 名和 table 名做 Hash 計(jì)算并進(jìn)行 event 分發(fā) # matcher 的匹配語法和過濾器規(guī)則語法相同 dispatchers = [ {matcher = ['dspdev.*'], dispatcher = "ts"} ] # 對(duì)于 MQ 類的 Sink,可以指定消息的協(xié)議格式 # 目前支持 default、canal、avro 和 maxwell 四種協(xié)議。default 為 TiCDC Open Protocol protocol = "canal" [cyclic-replication] # 是否開啟環(huán)形同步 enable = false # 當(dāng)前 TiCDC 的復(fù)制 ID replica-id = 1 # 需要過濾掉的同步 ID filter-replica-ids = [2,3] # 是否同步 DDL sync-ddl = true
--sink-uri="kafka://127.0.0.1:9092/cdc-test?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
這樣就會(huì)將tidb cdc 數(shù)據(jù)以protobuf數(shù)據(jù)發(fā)完kafka,我們只需要在下游做解析就好 具體配置解釋參考:tidb配置連接
pom引入如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.konka.dsp</groupId> <artifactId>kafka-parse</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-parse</name> <description>Demo project for Spring Boot</description> <properties> <java.version>11</java.version> <fastjson.version>1.2.70</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.cloud</groupId>--> <!-- <artifactId>spring-cloud-starter</artifactId>--> <!-- </dependency>--> <!-- <dependency>--> <!-- <groupId>org.springframework.cloud</groupId>--> <!-- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>--> <!-- </dependency>--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
properties 如下:
###########【Kafka集群】########### spring.kafka.bootstrap-servers=192.168.8.71:9092 ###########【初始化生產(chǎn)者配置】########### # 重試次數(shù) spring.kafka.producer.retries=0 # 應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延時(shí) spring.kafka.producer.properties.linger.ms=0 # 當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會(huì)將消息提交給kafka # linger.ms為0表示每接收到一條消息就提交給kafka,這時(shí)候batch-size其實(shí)就沒用了 ? # 生產(chǎn)端緩沖區(qū)大小 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化類 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 自定義分區(qū)器 # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner ###########【初始化消費(fèi)者配置】########### # 默認(rèn)的消費(fèi)組ID spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自動(dòng)提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延時(shí)(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms=1000 # 當(dāng)kafka中沒有初始o(jì)ffset或offset超出范圍時(shí)將自動(dòng)重置offset # earliest:重置為分區(qū)中最小的offset; # latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù)); # none:只要有一個(gè)分區(qū)不存在已提交的offset,就拋出異常; spring.kafka.consumer.auto-offset-reset=latest # 消費(fèi)會(huì)話超時(shí)時(shí)間(超過這個(gè)時(shí)間consumer沒有發(fā)送心跳,就會(huì)觸發(fā)rebalance操作) spring.kafka.consumer.properties.session.timeout.ms=120000 # 消費(fèi)請(qǐng)求超時(shí)時(shí)間 spring.kafka.consumer.properties.request.timeout.ms=180000 # Kafka提供的序列化和反序列化類 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.alibaba.otter.canal.client.kafka.MessageDeserializer # 消費(fèi)端監(jiān)聽的topic不存在時(shí),項(xiàng)目啟動(dòng)會(huì)報(bào)錯(cuò)(關(guān)掉) spring.kafka.listener.missing-topics-fatal=false #過濾table和字段 table.data = {"sales_order_header":"id,customer_name,total_amount,created_date"} # 設(shè)置批量消費(fèi) # spring.kafka.listener.type=batch # 批量消費(fèi)每次最多消費(fèi)多少條消息
sprint boot kafka 消費(fèi)端代碼如下:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.FlatMessage; import com.alibaba.otter.canal.protocol.Message; import com.konka.dsp.kafkaparse.CanalKafkaClientExample; import com.konka.dsp.kafkaparse.tidb.KafkaMessage; import com.konka.dsp.kafkaparse.tidb.TicdcEventData; import com.konka.dsp.kafkaparse.tidb.TicdcEventDecoder; import com.konka.dsp.kafkaparse.tidb.TicdcEventFilter; import com.konka.dsp.kafkaparse.tidb.value.TicdcEventDDL; import com.konka.dsp.kafkaparse.tidb.value.TicdcEventResolve; import com.konka.dsp.kafkaparse.tidb.value.TicdcEventRowChange; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @Component public class kafkaConsumer { protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class); // 消費(fèi)監(jiān)聽 @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("#{${table.data}}") private Map<String,String> map; @KafkaListener(topics = {"cdc-test"}) public void onMessage1(ConsumerRecord<String, Message> consumerRecord) throws UnsupportedEncodingException { Message message = consumerRecord.value(); long batchId = message.getId(); FlatMessage fm = new FlatMessage(); List<CanalEntry.Entry> entrys = message.getEntries(); for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } fm.setId(entry.getHeader().getExecuteTime()); fm.setDatabase(entry.getHeader().getSchemaName()); fm.setEs(entry.getHeader().getExecuteTime()); fm.setTs(entry.getHeader().getExecuteTime()); fm.setTable(entry.getHeader().getTableName()); fm.setType(rowChage.getEventType().name()); CanalEntry.EventType eventType = rowChage.getEventType(); fm.setIsDdl(rowChage.getIsDdl()); fm.setSql(rowChage.getSql()); Map<String,String> MySQLTypes = new HashMap<>(); Map<String,Integer> sqlType = new HashMap<>(); List<String> pkNames = new ArrayList<>(); logger.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); String[] filtercolumn = map.get(entry.getHeader().getTableName()).split(","); logger.info(" filter --> column {}",filtercolumn); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { fm.setData(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn)); fm.setMysqlType(setMysqlTypes(rowData.getBeforeColumnsList(),filtercolumn)); fm.setSqlType(setSqlTypes(rowData.getBeforeColumnsList(),filtercolumn)); } else if (eventType == CanalEntry.EventType.INSERT) { fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn)); fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn)); fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn)); } else { logger.info("-------> before->{}",rowData.getBeforeColumnsList().size()); fm.setOld(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn)); logger.info("-------> after"); fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn)); fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn)); fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn)); if(rowData.getBeforeColumnsList().size()==0&&rowData.getAfterColumnsList().size()>0){ fm.setType("INSERT"); } } } HashSet h = new HashSet(pkNames); pkNames.clear(); pkNames.addAll(h); fm.setPkNames(pkNames); } logger.info("json解析:{}",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue)); kafkaTemplate.send("canal-data",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue)); // // FlatMessage flatMessage = (FlatMessage)JSON.parseObject(flatMessageJson, FlatMessage.class); // 消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容 // KafkaMessage kafkaMessage = new KafkaMessage(); // kafkaMessage.setKey(consumerRecord.key()); // kafkaMessage.setValue(consumerRecord.value()); // kafkaMessage.setOffset(consumerRecord.offset()); // kafkaMessage.setPartition(consumerRecord.partition()); // kafkaMessage.setTimestamp(consumerRecord.timestamp()); // TicdcEventFilter filter = new TicdcEventFilter(); // TicdcEventDecoder ticdcEventDecoder = new TicdcEventDecoder(kafkaMessage); // while (ticdcEventDecoder.hasNext()) { // TicdcEventData data = ticdcEventDecoder.next(); // if (data.getTicdcEventValue() instanceof TicdcEventRowChange) { // boolean ok = filter.check(data.getTicdcEventKey().getTbl(), data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs()); // if (ok) { // // deal with row change event // } else { // // ignore duplicated messages // } // } else if (data.getTicdcEventValue() instanceof TicdcEventDDL) { // // deal with ddl event // } else if (data.getTicdcEventValue() instanceof TicdcEventResolve) { // filter.resolveEvent(data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs()); // // deal with resolve event // } // System.out.println(JSON.toJSONString(data, true)); // } } private List<Map<String,String>> saveRowData(List<CanalEntry.Column> columns,List<String> pkNames,String[] filter) { Map map = new HashMap<>(); List<Map<String,String>> rowdata = new ArrayList<>(); columns.forEach(column -> { if(column.hasIsKey()){ pkNames.add(column.getName()); } if(Arrays.asList(filter).contains(column.getName())){ map.put(column.getName(),column.getValue().equals("")?"NULL":column.getValue()); } //防止flink接收""報(bào)錯(cuò) }); rowdata.add(map); return rowdata; // rabbitTemplate.convertAndSend(tableEventType.toUpperCase(),JSON.toJSONString(map)); } private Map<String,String> setMysqlTypes(List<CanalEntry.Column> columns,String[] filter){ Map<String,String> map = new HashMap<>(); columns.forEach(column -> { if(Arrays.asList(filter).contains(column.getName())){ map.put(column.getName(),column.getMysqlType()); } }); return map; } private Map<String,Integer> setSqlTypes(List<CanalEntry.Column> columns,String[] filter){ Map<String,Integer> map = new HashMap<>(); columns.forEach(column -> { if(Arrays.asList(filter).contains(column.getName())){ map.put(column.getName(),column.getSqlType()); } }); return map; } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
這里基本上將 tidb的數(shù)據(jù)轉(zhuǎn)化為canal-json格式數(shù)據(jù),這里我們繼續(xù)將轉(zhuǎn)化后的數(shù)據(jù)發(fā)完kafka,以便kafka 繼續(xù)消費(fèi),這里有個(gè)點(diǎn)就是不知道為什么tidb出來的insert和update eventtype類型都是UPDATE,所以我在代碼做了判斷沒有OLD的話基本上就是INSERT了
具體參考官網(wǎng) flinktable配置 把table相關(guān)jar包拷貝到flink下的lib目錄下即可 這里的會(huì)用到另外一個(gè)知乎開源的相關(guān)包項(xiàng)目地址如下: https://github.com/pingcap-incubator/TiBigData/ 把項(xiàng)目編譯完成以后把flink相關(guān)jar包拷貝到flink下的lib下
import org.apache.flink.api.java.DataSet; import org.apache.flink.table.api.*; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.*; public class SalesOrderStream { public static Table report(Table transactions) { return transactions.select( $("customer_name"), $("created_date"), $("total_amount")) .groupBy($("customer_name"), $("created_date")) .select( $("customer_name"), $("total_amount").sum().as("total_amount"), $("created_date") ); } public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); // tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" + //// " id BIGINT not null,\n" + // " customer_name STRING,\n"+ //// " dsp_org_name STRING,\n"+ // " total_amount DECIMAL(38,2),\n" + //// " total_discount DECIMAL(16,2),\n" + //// " pay_amount DECIMAL(16,2),\n" + //// " total_amount DECIMAL(16,2),\n" + // " created_date TIMESTAMP(3)\n" + // ") WITH (\n" + // " 'connector' = 'mysql-cdc',\n" + // " 'hostname' = '192.168.8.73',\n" + // " 'port' = '4000',\n"+ // " 'username' = 'flink',\n"+ // " 'password' = 'flink',\n"+ // " 'database-name' = 'dspdev',\n"+ // " 'table-name' = 'sales_order_header'\n"+ // ")"); tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" + " `id` BIGINT,\n"+ " `total_amount` DECIMAL(16,2) ,\n"+ " `customer_name` STRING,\n"+ " `created_date` TIMESTAMP(3) ,\n"+ " PRIMARY KEY (`id`) NOT ENFORCED "+ ") WITH (\n" + "'connector' = 'kafka',\n"+ "'topic' = 'canal-data',\n"+ "'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+ "'properties.group.id' = 'test',\n"+ "'scan.startup.mode' = 'earliest-offset',\n"+ "'format' = 'canal-json'\n"+ ")"); tEnv.executeSql("CREATE TABLE spend_report (\n" + " customer_name STRING,\n" + // " total_amount DECIMAL(16,2),\n" + // " total_discount DECIMAL(16,2),\n" + // " pay_amount DECIMAL(16,2),\n" + " total_amount DECIMAL(16,2),\n" + " created_date TIMESTAMP(3),\n" + " PRIMARY KEY (customer_name,created_date) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'tidb',\n" + " 'tidb.database.url' = 'jdbc:mysql://192.168.8.73:4000/dspdev',\n" + " 'tidb.username' = 'flink',\n"+ " 'tidb.password' = 'flink',\n"+ " 'tidb.database.name' = 'dspdev',\n"+ " 'tidb.table.name' = 'spend_report'\n"+ ")"); Table transactions = tEnv.from("sales_order_header_stream"); report(transactions).executeInsert("spend_report"); } }
這樣在我數(shù)據(jù)庫里面就可以實(shí)時(shí)統(tǒng)計(jì)當(dāng)前的銷售總價(jià)并寫入數(shù)據(jù)庫里,最后數(shù)據(jù)庫數(shù)據(jù)如下:
到此,相信大家對(duì)“TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
分享題目:TiDB+FLINK進(jìn)行數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)的方法是什么
網(wǎng)站地址:http://muchs.cn/article6/jpecig.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、網(wǎng)站維護(hù)、響應(yīng)式網(wǎng)站、搜索引擎優(yōu)化、網(wǎng)站制作、網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)