add flink datasources and sinks

This commit is contained in:
罗祥 2019-11-01 17:35:22 +08:00
parent 944a3a7880
commit 8dcdb4aaf3
5 changed files with 117 additions and 28 deletions

View File

@ -144,7 +144,7 @@ under the License.
</filters> </filters>
<transformers> <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.heibaiying.SampleJob</mainClass> <mainClass>com.heibaiying.WaterMarkJob</mainClass>
</transformer> </transformer>
</transformers> </transformers>
</configuration> </configuration>

View File

@ -0,0 +1,52 @@
package com.heibaiying;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
public class PeriodicWatermarksJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
// 设置以事件时间为基准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> streamSource = env.socketTextStream("192.168.200.229", 8888, "\n", 3);
streamSource.map(new MapFunction<String, Tuple3<Long, String, Long>>() {
@Override
public Tuple3<Long, String, Long> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(Long.valueOf(split[0]), split[1], 1L);
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
.keyBy(1).timeWindow(Time.seconds(3)).sum(2).print();
env.execute();
}
}
class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Tuple3<Long, String, Long>> {
private final long maxOutOfOrderness = 3000L;
private long currentMaxTimestamp = 0L;
@Override
public long extractTimestamp(Tuple3<Long, String, Long> element, long previousElementTimestamp) {
long timestamp = element.f0;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

View File

@ -0,0 +1,46 @@
package com.heibaiying;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
public class PunctuatedWatermarksJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置以事件时间为基准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> streamSource = env.socketTextStream("192.168.200.229", 8888, "\n", 3);
streamSource.map(new MapFunction<String, Tuple3<Long, String, Long>>() {
@Override
public Tuple3<Long, String, Long> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(Long.valueOf(split[0]), split[1], 1L);
}
}).assignTimestampsAndWatermarks(new PunctuatedAssigner())
.keyBy(1).timeWindow(Time.seconds(3)).sum(2).print();
env.execute();
}
}
class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple3<Long, String, Long>> {
@Override
public long extractTimestamp(Tuple3<Long, String, Long> element, long previousElementTimestamp) {
return element.f0;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple3<Long, String, Long> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
}

View File

@ -1,27 +0,0 @@
package com.heibaiying;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SampleJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.socketTextStream("192.168.200.229", 9999, "\n", 3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = value.split("\t");
for (String word : words) {
out.collect(new Tuple2<>(word, 1L));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print();
env.execute("Flink Streaming");
}
}

View File

@ -0,0 +1,18 @@
1572501901000,hadoop
1572501902000,hadoop
1572501903000,hadoop
1572501904000,flink
1572501905000,spark
1572501906000,spark -> (1572501901000,hadoop,2)
1572501907000,hive
1572501908000,hive
1572501909000,hive -> (1572501903000,hadoop,1)
(1572501905000,spark ,1)
(1572501904000,flink,1)
1572501910000,spark
1572501911000,storm
1572501912000,storm -> (1572501906000,spark,1)
(1572501907000,hive,2)
1572501915000,yarn -> (1572501911000,storm,1)
(1572501909000,hive,1)
(1572501910000,spark,1)