flink sink

This commit is contained in:
罗祥 2019-10-31 11:01:51 +08:00
parent ecd8805ed5
commit 944a3a7880
6 changed files with 170 additions and 13 deletions

View File

@ -1,7 +1,7 @@
package com.heibaiying;
import com.heibaiying.bean.Employee;
import com.heibaiying.sink.FlinkToMySQL;
import com.heibaiying.sink.FlinkToMySQLSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@ -17,7 +17,7 @@ public class CustomSinkJob {
new Employee("hei", 10, date),
new Employee("bai", 20, date),
new Employee("ying", 30, date));
streamSource.addSink(new FlinkToMySQL());
streamSource.addSink(new FlinkToMySQLSink());
env.execute();
}
}

View File

@ -20,7 +20,7 @@ public class KafkaStreamingJob {
// 1.指定Kafka的相关配置属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.229:9092");
properties.setProperty("bootstrap.servers", "192.168.200.0:9092");
// 2.接收Kafka上的数据
DataStream<String> stream = env
@ -35,7 +35,9 @@ public class KafkaStreamingJob {
};
// 4. 定义Flink Kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. 将接收到输入元素*2后写出到Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");

View File

@ -8,7 +8,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class FlinkToMySQL extends RichSinkFunction<Employee> {
public class FlinkToMySQLSink extends RichSinkFunction<Employee> {
private PreparedStatement stmt;
private Connection conn;
@ -16,7 +16,7 @@ public class FlinkToMySQL extends RichSinkFunction<Employee> {
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://192.168.200.229:3306/employees?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", "root", "123456");
conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", "root", "123456");
String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
stmt = conn.prepareStatement(sql);
}

View File

@ -1,4 +1,10 @@
# Flink Sink
<nav>
<a href="#一Data-Sinks">一、Data Sinks</a><br/>
<a href="#二Streaming-Connectors">二、Streaming Connectors</a><br/>
<a href="#三整合-Kafka-Sink">三、整合 Kafka Sink</a><br/>
<a href="#四自定义-Sink">四、自定义 Sink</a><br/>
</nav>
## 一、Data Sinks
@ -55,6 +61,8 @@ public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());
```
## 二、Streaming Connectors
除了上述 API 外Flink 中还内置了系列的 Connectors 连接器,用于将计算结果输入到常用的存储系统或者消息中间件中,具体如下:
@ -77,14 +85,47 @@ streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());
这里接着在 Data Sources 章节介绍的整合 Kafka Source 的基础上,将 Kafka Sink 也一并进行整合,具体步骤如下。
## 三、整合 Kafka Sink
### 3.1 addSink
Flink 提供了 addSink 方法用来调用自定义的 Sink 或者第三方的连接器,想要将计算结果写出到 Kafka需要使用该方法来调用 Kafka 的生产者 FlinkKafkaProducer具体代码如下
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.指定Kafka的相关配置属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.0:9092");
// 2.接收Kafka上的数据
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
// 3.定义计算结果到 Kafka ProducerRecord 的转换
KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>("flink-stream-out-topic", element.getBytes());
}
};
// 4. 定义Flink Kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. 将接收到输入元素*2后写出到Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");
```
### 3.2 创建输出主题
创建用于输出测试的主题:
```shell
# 创建用于测试的输出主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
@ -97,23 +138,116 @@ bin/kafka-topics.sh --create \
### 3.3 启动消费者
启动一个 Kafka 消费者,用于查看 Flink 程序的输出情况:
```java
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic
```
### 3.4 测试结果
在 Kafka 生产者上发送消息到 Flink 程序,观察 Flink 程序转换后的输出情况,具体如下:
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-kafka-producer-consumer.png"/> </div>
可以看到 Kafka 生成者发出的数据已经被 Flink 程序正常接收到,并经过转换后又输出到 Kafka 对应的 Topic 上。
## 四、自定义 Sink
除了使用内置的第三方连接器外Flink 还支持使用自定义的 Sink 来满足多样化的输出需求。想要实现自定义的 Sink ,需要直接或者间接实现 SinkFunction 接口。通常情况下,我们都是实现其抽象类 RichSinkFunction相比于 SinkFunction ,其提供了更多的与生命周期相关的方法。两者间的关系如下:
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-richsink.png"/> </div>
这里我们以自定义一个 FlinkToMySQLSink 为例,将计算结果写出到 MySQL 数据库中,具体步骤如下:
### 4.1 导入依赖
首先需要导入 MySQL 相关的依赖:
```xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
```
### 4.2 自定义 Sink
### 4.3 测试结果
继承自 RichSinkFunction实现自定义的 Sink
```java
public class FlinkToMySQLSink extends RichSinkFunction<Employee> {
private PreparedStatement stmt;
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" +
"?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
"root",
"123456");
String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
stmt = conn.prepareStatement(sql);
}
@Override
public void invoke(Employee value, Context context) throws Exception {
stmt.setString(1, value.getName());
stmt.setInt(2, value.getAge());
stmt.setDate(3, value.getBirthday());
stmt.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
}
}
```
### 4.3 使用自定义 Sink
想要使用自定义的 Sink同样是需要调用 addSink 方法,具体如下:
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Date date = new Date(System.currentTimeMillis());
DataStreamSource<Employee> streamSource = env.fromElements(
new Employee("hei", 10, date),
new Employee("bai", 20, date),
new Employee("ying", 30, date));
streamSource.addSink(new FlinkToMySQLSink());
env.execute();
```
### 4.4 测试结果
启动程序,观察数据库写入情况:
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-mysql-sink.png"/> </div>
数据库成功写入,代表自定义 Sink 整合成功。
> 以上所有用例的源码见本仓库:[flink-kafka-integration]( https://github.com/heibaiying/BigData-Notes/tree/master/code/Flink/flink-kafka-integration)
## 参考资料
1. data-sinks https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sinks
2. Streaming Connectorshttps://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html
3. Apache Kafka Connector https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

View File

@ -1,4 +1,13 @@
# Flink Windows
<nav>
<a href="#一窗口概念">一、窗口概念</a><br/>
<a href="#二Time-Windows">二、Time Windows</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-Tumbling-Windows">2.1 Tumbling Windows</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-Sliding-Windows">2.2 Sliding Windows</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-Session-Windows">2.3 Session Windows</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#24-Global-Windows">2.4 Global Windows</a><br/>
<a href="#三Count-Windows">三、Count Windows</a><br/>
</nav>
## 一、窗口概念
@ -12,7 +21,9 @@ Time Windows 用于以时间为维度来进行数据聚合,具体分为以下
滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如每隔1小时统计过去1小时内的商品点击量那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:
![flink-tumbling-windows](D:\BigData-Notes\pictures\flink-tumbling-windows.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-tumbling-windows.png"/> </div>
这里我们以词频统计为例,给出一个具体的用例,代码如下:
@ -34,13 +45,19 @@ env.execute("Flink Streaming");
测试结果如下:
![flink-window-word-count](D:\BigData-Notes\pictures\flink-window-word-count.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-window-word-count.png"/> </div>
### 2.2 Sliding Windows
滑动窗口用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下:
![flink-sliding-windows](D:\BigData-Notes\pictures\flink-sliding-windows.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-sliding-windows.png"/> </div>
可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下:
@ -53,7 +70,9 @@ timeWindow(Time.minutes(1),Time.seconds(3))
当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过 Session Windows 来进行实现。
![flink-session-windows](D:\BigData-Notes\pictures\flink-session-windows.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-session-windows.png"/> </div>
具体的实现代码如下:
@ -68,7 +87,9 @@ window(EventTimeSessionWindows.withGap(Time.seconds(10)))
最后一个窗口是全局窗口, 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。
![flink-non-windowed](D:\BigData-Notes\pictures\flink-non-windowed.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/flink-non-windowed.png"/> </div>
这里继续以上面词频统计的案例为例,示例代码如下:

BIN
pictures/flink-richsink.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.6 KiB