diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java index 14861bc..56bd251 100644 --- a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java @@ -1,7 +1,7 @@ package com.heibaiying; import com.heibaiying.bean.Employee; -import com.heibaiying.sink.FlinkToMySQL; +import com.heibaiying.sink.FlinkToMySQLSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -17,7 +17,7 @@ public class CustomSinkJob { new Employee("hei", 10, date), new Employee("bai", 20, date), new Employee("ying", 30, date)); - streamSource.addSink(new FlinkToMySQL()); + streamSource.addSink(new FlinkToMySQLSink()); env.execute(); } } diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java index d26da5f..9977be2 100644 --- a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java @@ -20,7 +20,7 @@ public class KafkaStreamingJob { // 1.指定Kafka的相关配置属性 Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", "192.168.200.229:9092"); + properties.setProperty("bootstrap.servers", "192.168.200.0:9092"); // 2.接收Kafka上的数据 DataStream stream = env @@ -35,7 +35,9 @@ public class KafkaStreamingJob { }; // 4. 定义Flink Kafka生产者 FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic", - kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5); + kafkaSerializationSchema, + properties, + FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5); // 5. 将接收到输入元素*2后写出到Kafka stream.map((MapFunction) value -> value + value).addSink(kafkaProducer); env.execute("Flink Streaming"); diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQL.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQLSink.java similarity index 87% rename from code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQL.java rename to code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQLSink.java index aa3e064..0a86e61 100644 --- a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQL.java +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQLSink.java @@ -8,7 +8,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; -public class FlinkToMySQL extends RichSinkFunction { +public class FlinkToMySQLSink extends RichSinkFunction { private PreparedStatement stmt; private Connection conn; @@ -16,7 +16,7 @@ public class FlinkToMySQL extends RichSinkFunction { @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.cj.jdbc.Driver"); - conn = DriverManager.getConnection("jdbc:mysql://192.168.200.229:3306/employees?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", "root", "123456"); + conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", "root", "123456"); String sql = "insert into emp(name, age, birthday) values(?, ?, ?)"; stmt = conn.prepareStatement(sql); } diff --git a/notes/Flink_Data_Sink.md b/notes/Flink_Data_Sink.md index 939ce8a..218d2f7 100644 --- a/notes/Flink_Data_Sink.md +++ b/notes/Flink_Data_Sink.md @@ -1,4 +1,10 @@ # Flink Sink + ## 一、Data Sinks @@ -55,6 +61,8 @@ public DataStreamSink writeAsText(String path, WriteMode writeMode) { streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema()); ``` + + ## 二、Streaming Connectors 除了上述 API 外,Flink 中还内置了系列的 Connectors 连接器,用于将计算结果输入到常用的存储系统或者消息中间件中,具体如下: @@ -77,14 +85,47 @@ streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema()); 这里接着在 Data Sources 章节介绍的整合 Kafka Source 的基础上,将 Kafka Sink 也一并进行整合,具体步骤如下。 + + ## 三、整合 Kafka Sink ### 3.1 addSink +Flink 提供了 addSink 方法用来调用自定义的 Sink 或者第三方的连接器,想要将计算结果写出到 Kafka,需要使用该方法来调用 Kafka 的生产者 FlinkKafkaProducer,具体代码如下: + +```java +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// 1.指定Kafka的相关配置属性 +Properties properties = new Properties(); +properties.setProperty("bootstrap.servers", "192.168.200.0:9092"); + +// 2.接收Kafka上的数据 +DataStream stream = env + .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties)); + +// 3.定义计算结果到 Kafka ProducerRecord 的转换 +KafkaSerializationSchema kafkaSerializationSchema = new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(String element, @Nullable Long timestamp) { + return new ProducerRecord<>("flink-stream-out-topic", element.getBytes()); + } +}; +// 4. 定义Flink Kafka生产者 +FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic", + kafkaSerializationSchema, + properties, + FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5); +// 5. 将接收到输入元素*2后写出到Kafka +stream.map((MapFunction) value -> value + value).addSink(kafkaProducer); +env.execute("Flink Streaming"); +``` + ### 3.2 创建输出主题 +创建用于输出测试的主题: + ```shell -# 创建用于测试的输出主题 bin/kafka-topics.sh --create \ --bootstrap-server hadoop001:9092 \ --replication-factor 1 \ @@ -97,23 +138,116 @@ bin/kafka-topics.sh --create \ ### 3.3 启动消费者 +启动一个 Kafka 消费者,用于查看 Flink 程序的输出情况: + ```java bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic ``` ### 3.4 测试结果 +在 Kafka 生产者上发送消息到 Flink 程序,观察 Flink 程序转换后的输出情况,具体如下: +
+ + +可以看到 Kafka 生成者发出的数据已经被 Flink 程序正常接收到,并经过转换后又输出到 Kafka 对应的 Topic 上。 ## 四、自定义 Sink +除了使用内置的第三方连接器外,Flink 还支持使用自定义的 Sink 来满足多样化的输出需求。想要实现自定义的 Sink ,需要直接或者间接实现 SinkFunction 接口。通常情况下,我们都是实现其抽象类 RichSinkFunction,相比于 SinkFunction ,其提供了更多的与生命周期相关的方法。两者间的关系如下: + +
+ + +这里我们以自定义一个 FlinkToMySQLSink 为例,将计算结果写出到 MySQL 数据库中,具体步骤如下: + ### 4.1 导入依赖 +首先需要导入 MySQL 相关的依赖: + +```xml + + mysql + mysql-connector-java + 8.0.16 + +``` + ### 4.2 自定义 Sink -### 4.3 测试结果 +继承自 RichSinkFunction,实现自定义的 Sink : + +```java +public class FlinkToMySQLSink extends RichSinkFunction { + + private PreparedStatement stmt; + private Connection conn; + + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("com.mysql.cj.jdbc.Driver"); + conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" + + "?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", + "root", + "123456"); + String sql = "insert into emp(name, age, birthday) values(?, ?, ?)"; + stmt = conn.prepareStatement(sql); + } + + @Override + public void invoke(Employee value, Context context) throws Exception { + stmt.setString(1, value.getName()); + stmt.setInt(2, value.getAge()); + stmt.setDate(3, value.getBirthday()); + stmt.executeUpdate(); + } + + @Override + public void close() throws Exception { + super.close(); + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + +} +``` + +### 4.3 使用自定义 Sink + +想要使用自定义的 Sink,同样是需要调用 addSink 方法,具体如下: + +```java +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Date date = new Date(System.currentTimeMillis()); +DataStreamSource streamSource = env.fromElements( + new Employee("hei", 10, date), + new Employee("bai", 20, date), + new Employee("ying", 30, date)); +streamSource.addSink(new FlinkToMySQLSink()); +env.execute(); +``` + +### 4.4 测试结果 + +启动程序,观察数据库写入情况: + +
+ + +数据库成功写入,代表自定义 Sink 整合成功。 + +> 以上所有用例的源码见本仓库:[flink-kafka-integration]( https://github.com/heibaiying/BigData-Notes/tree/master/code/Flink/flink-kafka-integration) +## 参考资料 +1. data-sinks: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sinks +2. Streaming Connectors:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html +3. Apache Kafka Connector: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html diff --git a/notes/Flink_Windows.md b/notes/Flink_Windows.md index 40deba1..022772b 100644 --- a/notes/Flink_Windows.md +++ b/notes/Flink_Windows.md @@ -1,4 +1,13 @@ # Flink Windows + ## 一、窗口概念 @@ -12,7 +21,9 @@ Time Windows 用于以时间为维度来进行数据聚合,具体分为以下 滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔1小时统计过去1小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下: -![flink-tumbling-windows](D:\BigData-Notes\pictures\flink-tumbling-windows.png) +
+ + 这里我们以词频统计为例,给出一个具体的用例,代码如下: @@ -34,13 +45,19 @@ env.execute("Flink Streaming"); 测试结果如下: -![flink-window-word-count](D:\BigData-Notes\pictures\flink-window-word-count.png) +
+ + + + ### 2.2 Sliding Windows 滑动窗口用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下: -![flink-sliding-windows](D:\BigData-Notes\pictures\flink-sliding-windows.png) +
+ + 可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下: @@ -53,7 +70,9 @@ timeWindow(Time.minutes(1),Time.seconds(3)) 当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过 Session Windows 来进行实现。 -![flink-session-windows](D:\BigData-Notes\pictures\flink-session-windows.png) +
+ + 具体的实现代码如下: @@ -68,7 +87,9 @@ window(EventTimeSessionWindows.withGap(Time.seconds(10))) 最后一个窗口是全局窗口, 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。 -![flink-non-windowed](D:\BigData-Notes\pictures\flink-non-windowed.png) +
+ + 这里继续以上面词频统计的案例为例,示例代码如下: diff --git a/pictures/flink-richsink.png b/pictures/flink-richsink.png new file mode 100644 index 0000000..9ab26ef Binary files /dev/null and b/pictures/flink-richsink.png differ