From 948c0dad74aaaa80c4a5a8201ec8cab08cec4e6b Mon Sep 17 00:00:00 2001 From: heibaiying <31504331+heibaiying@users.noreply.github.com> Date: Sat, 26 Oct 2019 20:25:30 +0800 Subject: [PATCH] Update Flink_Data_Transformation.md --- notes/Flink_Data_Transformation.md | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/notes/Flink_Data_Transformation.md b/notes/Flink_Data_Transformation.md index 83f4df9..8f5abb3 100644 --- a/notes/Flink_Data_Transformation.md +++ b/notes/Flink_Data_Transformation.md @@ -65,7 +65,10 @@ env.fromElements(1, 2, 3, 4, 5).filter(x -> x > 3).print(); 如下例子将数据按照 key 值分区后,滚动进行求和计算: ```java -DataStream> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("b", 3), new Tuple2<>("b", 5)); +DataStream> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1), + new Tuple2<>("a", 2), + new Tuple2<>("b", 3), + new Tuple2<>("b", 5)); KeyedStream, Tuple> keyedStream = tuple2DataStream.keyBy(0); keyedStream.reduce((ReduceFunction>) (value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print(); @@ -113,8 +116,10 @@ keyedStream.maxBy("key"); 用于连接两个或者多个元素类型相同的 DataStream 。当然一个 DataStream 也可以与其本生进行连接,此时该 DataStream 中的每个元素都会被获取两次: ```shell -DataStreamSource> streamSource01 = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("a", 2)); -DataStreamSource> streamSource02 = env.fromElements(new Tuple2<>("b", 1), new Tuple2<>("b", 2)); +DataStreamSource> streamSource01 = env.fromElements(new Tuple2<>("a", 1), + new Tuple2<>("a", 2)); +DataStreamSource> streamSource02 = env.fromElements(new Tuple2<>("b", 1), + new Tuple2<>("b", 2)); streamSource01.union(streamSource02); streamSource01.union(streamSource01,streamSource02); ``` @@ -124,7 +129,8 @@ streamSource01.union(streamSource01,streamSource02); Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream: ```java -DataStreamSource> streamSource01 = env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 5)); +DataStreamSource> streamSource01 = env.fromElements(new Tuple2<>("a", 3), + new Tuple2<>("b", 5)); DataStreamSource streamSource02 = env.fromElements(2, 3, 9); // 使用connect进行连接 ConnectedStreams, Integer> connect = streamSource01.connect(streamSource02); @@ -170,7 +176,9 @@ split.select("even").print(); project 主要用于获取 tuples 中的指定字段集,示例如下: ```java -DataStreamSource> tuple3DataStreamSource = env.fromElements(new Tuple3<>("li", 22, "2018-09-23"),new Tuple3<>("ming", 33, "2020-09-23")); +DataStreamSource> tuple3DataStreamSource = env.fromElements( + new Tuple3<>("li", 22, "2018-09-23"), + new Tuple3<>("ming", 33, "2020-09-23")); tuple3DataStreamSource.project(0,2).print(); // 输出 @@ -284,4 +292,4 @@ someStream.filter(...).slotSharingGroup("slotSharingGroupName"); ## 参考资料 -Flink Operators: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/ \ No newline at end of file +Flink Operators: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/