Update Flink_Data_Transformation.md

This commit is contained in:
heibaiying 2019-10-26 20:25:30 +08:00 committed by GitHub
parent 96d6951d24
commit 948c0dad74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -65,7 +65,10 @@ env.fromElements(1, 2, 3, 4, 5).filter(x -> x > 3).print();
如下例子将数据按照 key 值分区后,滚动进行求和计算:
```java
DataStream<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("b", 3), new Tuple2<>("b", 5));
DataStream<Tuple2<String, Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("b", 3),
new Tuple2<>("b", 5));
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = tuple2DataStream.keyBy(0);
keyedStream.reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) ->
new Tuple2<>(value1.f0, value1.f1 + value2.f1)).print();
@ -113,8 +116,10 @@ keyedStream.maxBy("key");
用于连接两个或者多个元素类型相同的 DataStream 。当然一个 DataStream 也可以与其本生进行连接,此时该 DataStream 中的每个元素都会被获取两次:
```shell
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("a", 2));
DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1), new Tuple2<>("b", 2));
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1),
new Tuple2<>("a", 2));
DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1),
new Tuple2<>("b", 2));
streamSource01.union(streamSource02);
streamSource01.union(streamSource01,streamSource02);
```
@ -124,7 +129,8 @@ streamSource01.union(streamSource01,streamSource02);
Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream
```java
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 5));
DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3),
new Tuple2<>("b", 5));
DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);
// 使用connect进行连接
ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
@ -170,7 +176,9 @@ split.select("even").print();
project 主要用于获取 tuples 中的指定字段集,示例如下:
```java
DataStreamSource<Tuple3<String, Integer, String>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("li", 22, "2018-09-23"),new Tuple3<>("ming", 33, "2020-09-23"));
DataStreamSource<Tuple3<String, Integer, String>> tuple3DataStreamSource = env.fromElements(
new Tuple3<>("li", 22, "2018-09-23"),
new Tuple3<>("ming", 33, "2020-09-23"));
tuple3DataStreamSource.project(0,2).print();
// 输出