From 8dcdb4aaf3fea18fe0ece23abf37c58fdf97aa52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Fri, 1 Nov 2019 17:35:22 +0800 Subject: [PATCH] add flink datasources and sinks --- code/Flink/flink-time-watermark/pom.xml | 2 +- .../com/heibaiying/PeriodicWatermarksJob.java | 52 +++++++++++++++++++ .../heibaiying/PunctuatedWatermarksJob.java | 46 ++++++++++++++++ .../main/java/com/heibaiying/SampleJob.java | 27 ---------- .../src/main/resources/sample.txt | 18 +++++++ 5 files changed, 117 insertions(+), 28 deletions(-) create mode 100644 code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java create mode 100644 code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java delete mode 100644 code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java create mode 100644 code/Flink/flink-time-watermark/src/main/resources/sample.txt diff --git a/code/Flink/flink-time-watermark/pom.xml b/code/Flink/flink-time-watermark/pom.xml index 2dd15f1..6255450 100644 --- a/code/Flink/flink-time-watermark/pom.xml +++ b/code/Flink/flink-time-watermark/pom.xml @@ -144,7 +144,7 @@ under the License. - com.heibaiying.SampleJob + com.heibaiying.WaterMarkJob diff --git a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java new file mode 100644 index 0000000..81fd6e1 --- /dev/null +++ b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java @@ -0,0 +1,52 @@ +package com.heibaiying; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; + +public class PeriodicWatermarksJob { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // 设置并行度为1 + env.setParallelism(1); + // 设置以事件时间为基准 + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + DataStreamSource streamSource = env.socketTextStream("192.168.200.229", 8888, "\n", 3); + streamSource.map(new MapFunction>() { + @Override + public Tuple3 map(String value) throws Exception { + String[] split = value.split(","); + return new Tuple3<>(Long.valueOf(split[0]), split[1], 1L); + } + }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) + .keyBy(1).timeWindow(Time.seconds(3)).sum(2).print(); + env.execute(); + + } +} + + +class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks> { + + private final long maxOutOfOrderness = 3000L; + private long currentMaxTimestamp = 0L; + + @Override + public long extractTimestamp(Tuple3 element, long previousElementTimestamp) { + long timestamp = element.f0; + currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); + return timestamp; + } + + @Override + public Watermark getCurrentWatermark() { + return new Watermark(currentMaxTimestamp - maxOutOfOrderness); + } +} diff --git a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java new file mode 100644 index 0000000..605b59e --- /dev/null +++ b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java @@ -0,0 +1,46 @@ +package com.heibaiying; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; + +public class PunctuatedWatermarksJob { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + // 设置以事件时间为基准 + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + DataStreamSource streamSource = env.socketTextStream("192.168.200.229", 8888, "\n", 3); + streamSource.map(new MapFunction>() { + @Override + public Tuple3 map(String value) throws Exception { + String[] split = value.split(","); + return new Tuple3<>(Long.valueOf(split[0]), split[1], 1L); + } + }).assignTimestampsAndWatermarks(new PunctuatedAssigner()) + .keyBy(1).timeWindow(Time.seconds(3)).sum(2).print(); + env.execute(); + + } +} + +class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks> { + + @Override + public long extractTimestamp(Tuple3 element, long previousElementTimestamp) { + return element.f0; + } + + @Override + public Watermark checkAndGetNextWatermark(Tuple3 lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } +} + diff --git a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java deleted file mode 100644 index 733b05d..0000000 --- a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.heibaiying; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Collector; - -public class SampleJob { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStreamSource streamSource = env.socketTextStream("192.168.200.229", 9999, "\n", 3); - streamSource.flatMap(new FlatMapFunction>() { - @Override - public void flatMap(String value, Collector> out) throws Exception { - String[] words = value.split("\t"); - for (String word : words) { - out.collect(new Tuple2<>(word, 1L)); - } - } - }).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); - env.execute("Flink Streaming"); - } -} diff --git a/code/Flink/flink-time-watermark/src/main/resources/sample.txt b/code/Flink/flink-time-watermark/src/main/resources/sample.txt new file mode 100644 index 0000000..afe1887 --- /dev/null +++ b/code/Flink/flink-time-watermark/src/main/resources/sample.txt @@ -0,0 +1,18 @@ +1572501901000,hadoop +1572501902000,hadoop +1572501903000,hadoop +1572501904000,flink +1572501905000,spark +1572501906000,spark -> (1572501901000,hadoop,2) +1572501907000,hive +1572501908000,hive +1572501909000,hive -> (1572501903000,hadoop,1) + (1572501905000,spark ,1) + (1572501904000,flink,1) +1572501910000,spark +1572501911000,storm +1572501912000,storm -> (1572501906000,spark,1) + (1572501907000,hive,2) +1572501915000,yarn -> (1572501911000,storm,1) + (1572501909000,hive,1) + (1572501910000,spark,1) \ No newline at end of file