Java应用Flink CDC监听MySQL数据变更内容输出到控制台

[复制链接]
发表于 2025-6-11 02:12:21 | 显示全部楼层 |阅读模式
maven 依靠

  1. <properties>
  2.         <flink.version>1.14.0</flink.version>
  3.         <flink-cdc.version>2.3.0</flink-cdc.version>
  4.     </properties>
  5.     <dependencies>
  6.         <!-- Flink dependencies -->
  7.         <dependency>
  8.             <groupId>org.apache.flink</groupId>
  9.             <artifactId>flink-connector-base</artifactId>
  10.             <version>${flink.version}</version>
  11.         </dependency>
  12.         <dependency>
  13.             <groupId>com.ververica</groupId>
  14.             <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  15.             <version>${flink-cdc.version}</version>
  16.         </dependency>
  17.         <dependency>
  18.             <groupId>org.apache.flink</groupId>
  19.             <artifactId>flink-streaming-java_2.12</artifactId>
  20.             <version>${flink.version}</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.apache.flink</groupId>
  24.             <artifactId>flink-clients_2.12</artifactId>
  25.             <version>${flink.version}</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.flink</groupId>
  29.             <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  30.             <version>${flink.version}</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>org.apache.flink</groupId>
  34.             <artifactId>flink-runtime-web_2.12</artifactId>
  35.             <version>${flink.version}</version>
  36.         </dependency>
  37.     </dependencies>
复制代码
自定义数据变化处理器

  1. package org.example;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. public class CustomSink extends RichSinkFunction<String> {
  5.     @Override
  6.     public void open(Configuration parameters) throws Exception {
  7.         super.open(parameters);
  8.     }
  9.     @Override
  10.     public void close() throws Exception {
  11.         super.close();
  12.     }
  13.     @Override
  14.     public void invoke(String value, Context context) throws Exception {
  15.         //0P字段,该字段也有4种取值。分别是C(Create ) , U(Updlate) . D(Delete ),Read 。
  16.         // 对于U操作,其数据部分同时包含了Before和After.
  17.          System.out.println(">>>" + value);
  18.     }
  19. }
复制代码
flink cdc监听

  1. package org.example;
  2. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  3. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.configuration.RestOptions;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSink;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class MysqlSourceExample {
  11.     public static void main(String[] args) throws Exception {
  12.         DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();
  13.         MySqlSource<String> source = MySqlSource.builder()
  14.                 .hostname("127.0.0.1")
  15.                 .port(3306)
  16.                 .databaseList("canal_manager")// set captured database
  17.                 .tableList("canal_manager.canal_user")// set captured table
  18.                 .startupOptions(StartupOptions.latest()) // 设置从最新的修改记录开始读取
  19.                 .username("root")
  20.                 .password("123456")
  21.                 .deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string
  22.                 .includeSchemaChanges(true)
  23.                 .build();
  24.         //启动一个webuI。
  25.         Configuration configuration = new Configuration();
  26.         configuration.setInteger(RestOptions.PORT, 8081);
  27.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  28.         //检者点间隔时间
  29.         env .enableCheckpointing(5000);
  30.         DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
  31.                 .addSink(new CustomSink());
  32.         env.execute();
  33.     }
  34. }
复制代码
验证

启动后web页面地址访问http://localhost:8081/,MySQL数据库canal_manager中的表canal_user数据发生修改,控制台有输出json:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表