maven 依靠
- <properties>
- <flink.version>1.14.0</flink.version>
- <flink-cdc.version>2.3.0</flink-cdc.version>
- </properties>
- <dependencies>
- <!-- Flink dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>${flink-cdc.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- </dependencies>
复制代码 自定义数据变化处理器
- package org.example;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- public class CustomSink extends RichSinkFunction<String> {
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- }
- @Override
- public void close() throws Exception {
- super.close();
- }
- @Override
- public void invoke(String value, Context context) throws Exception {
- //0P字段,该字段也有4种取值。分别是C(Create ) , U(Updlate) . D(Delete ),Read 。
- // 对于U操作,其数据部分同时包含了Before和After.
- System.out.println(">>>" + value);
- }
- }
复制代码 flink cdc监听
- package org.example;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.configuration.RestOptions;
- import org.apache.flink.streaming.api.datastream.DataStreamSink;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- public class MysqlSourceExample {
- public static void main(String[] args) throws Exception {
- DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();
- MySqlSource<String> source = MySqlSource.builder()
- .hostname("127.0.0.1")
- .port(3306)
- .databaseList("canal_manager")// set captured database
- .tableList("canal_manager.canal_user")// set captured table
- .startupOptions(StartupOptions.latest()) // 设置从最新的修改记录开始读取
- .username("root")
- .password("123456")
- .deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string
- .includeSchemaChanges(true)
- .build();
- //启动一个webuI。
- Configuration configuration = new Configuration();
- configuration.setInteger(RestOptions.PORT, 8081);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- //检者点间隔时间
- env .enableCheckpointing(5000);
- DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
- .addSink(new CustomSink());
- env.execute();
- }
- }
复制代码 验证
启动后web页面地址访问http://localhost:8081/,MySQL数据库canal_manager中的表canal_user数据发生修改,控制台有输出json:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|