add Flink Data Transformation

This commit is contained in:
罗祥
2019-10-26 20:17:41 +08:00
parent bbcef5c340
commit 96d6951d24
4 changed files with 317 additions and 28 deletions

View File

@ -18,8 +18,19 @@
package com.heibaiying;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
/**
* Skeleton for a Flink Batch Job.
@ -35,11 +46,9 @@ public class BatchJob {
public static void main(String[] args) throws Exception {
final String rootPath = "D:\\BigData-Notes\\code\\Flink\\flink-basis-java\\src\\main\\resources\\";
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> readTextFile = env.readTextFile(rootPath + "log4j.properties");
readTextFile.print();
env.execute("Flink Batch Java API Skeleton");
}
}

View File

@ -18,7 +18,20 @@
package com.heibaiying;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Skeleton for a Flink Streaming Job.
@ -34,31 +47,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataStream<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
* http://flink.apache.org/docs/latest/apis/streaming/index.html
*
*/
env.execute();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
}