K2K Template 脚本

使用flink 同步Kafka到Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 前两段是kafka2kafka
-- LDP init的话用目录下面的python脚本
CREATE TABLE kafka_source_ldp (
a STRING
)with ( 'connector' = 'kafka',
'topic' = 'LOADSHEET.LDP',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'pipeline_ums',
'properties.bootstrap.servers' = 'jxbigdatakafka01.juneyaoair.com:9092,jxbigdatakafka02.juneyaoair.com:9092,jxbigdatakafka03.juneyaoair.com:9092',
'properties.fetch.max.bytes' = '5242880',
'properties.allow.auto.create.topics' = 'false',
'properties.enable.auto.commit' = 'true',
'format' = 'raw' );

CREATE TABLE kafka_sink_ldp (
a STRING
)with ( 'connector' = 'kafka',
'topic' = 'JXHK.AUTO.DC_AOC_LOADSHEET_LDP',
'properties.bootstrap.servers' = '172.27.2.3:9092,172.27.2.4:9092,172.27.2.10:9092',
'properties.allow.auto.create.topics' = 'false',
'properties.enable.auto.commit' = 'true',
'format' = 'raw' );

insert into kafka_sink_ldp
select * from kafka_source_ldp;