diff --git a/code/Flink/flink-basis-java/pom.xml b/code/Flink/flink-basis-java/pom.xml new file mode 100644 index 0000000..82b282f --- /dev/null +++ b/code/Flink/flink-basis-java/pom.xml @@ -0,0 +1,239 @@ + + + 4.0.0 + + com.heibaiying + flink-basis-java + 1.0 + jar + + Flink Quickstart Job + http://www.myorganization.org + + + UTF-8 + 1.9.0 + 1.8 + 2.11 + ${java.version} + ${java.version} + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + + + + + + + + + + org.slf4j + slf4j-log4j12 + 1.7.7 + runtime + + + log4j + log4j + 1.2.17 + runtime + + + + org.projectlombok + lombok + 1.18.10 + provided + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.heibaiying.StreamingJob + + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.0.0,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + + + + + + + add-dependencies-for-IDEA + + + + idea.version + + + + + + org.apache.flink + flink-java + ${flink.version} + compile + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + compile + + + + + + diff --git a/code/Flink/flink-basis-java/src/main/java/com/heibaiying/BatchJob.java b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/BatchJob.java new file mode 100644 index 0000000..7d3a48b --- /dev/null +++ b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/BatchJob.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.heibaiying; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; + +/** + * Skeleton for a Flink Batch Job. + * + *

For a tutorial how to write a Flink batch application, check the + * tutorials and examples on the Flink Website. + * + *

To package your application into a JAR file for execution, + * change the main class in the POM.xml file to this class (simply search for 'mainClass') + * and run 'mvn clean package' on the command line. + */ +public class BatchJob { + + public static void main(String[] args) throws Exception { + + final String rootPath = "D:\\BigData-Notes\\code\\Flink\\flink-basis-java\\src\\main\\resources\\"; + // set up the batch execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource readTextFile = env.readTextFile(rootPath + "log4j.properties"); + readTextFile.print(); + } +} diff --git a/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java new file mode 100644 index 0000000..5c607c6 --- /dev/null +++ b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.heibaiying; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Skeleton for a Flink Streaming Job. + * + *

For a tutorial how to write a Flink streaming application, check the + * tutorials and examples on the Flink Website. + * + *

To package your application into a JAR file for execution, run + * 'mvn clean package' on the command line. + * + *

If you change the name of the main class (with the public static void main(String[] args)) + * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + */ +public class StreamingJob { + + public static void main(String[] args) throws Exception { + // set up the streaming execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + /* + * Here, you can start creating your execution plan for Flink. + * + * Start with getting some data from the environment, like + * env.readTextFile(textPath); + * + * then, transform the resulting DataStream using operations + * like + * .filter() + * .flatMap() + * .join() + * .coGroup() + * + * and many more. + * Have a look at the programming guide for the Java API: + * + * http://flink.apache.org/docs/latest/apis/streaming/index.html + * + */ + + // execute program + env.execute("Flink Streaming Java API Skeleton"); + } +} diff --git a/code/Flink/flink-basis/src/main/resources/log4j.properties b/code/Flink/flink-basis-java/src/main/resources/log4j.properties similarity index 100% rename from code/Flink/flink-basis/src/main/resources/log4j.properties rename to code/Flink/flink-basis-java/src/main/resources/log4j.properties diff --git a/code/Flink/flink-basis/pom.xml b/code/Flink/flink-basis-scala/pom.xml similarity index 99% rename from code/Flink/flink-basis/pom.xml rename to code/Flink/flink-basis-scala/pom.xml index a4ef7c3..8e57e57 100644 --- a/code/Flink/flink-basis/pom.xml +++ b/code/Flink/flink-basis-scala/pom.xml @@ -22,7 +22,7 @@ under the License. 4.0.0 com.heibaiying - flink-basis + flink-basis-scala 1.0 jar diff --git a/code/Flink/flink-basis-scala/src/main/resources/log4j.properties b/code/Flink/flink-basis-scala/src/main/resources/log4j.properties new file mode 100644 index 0000000..da32ea0 --- /dev/null +++ b/code/Flink/flink-basis-scala/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/code/Flink/flink-basis/src/main/resources/wordcount.txt b/code/Flink/flink-basis-scala/src/main/resources/wordcount.txt similarity index 100% rename from code/Flink/flink-basis/src/main/resources/wordcount.txt rename to code/Flink/flink-basis-scala/src/main/resources/wordcount.txt diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/BatchJob.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/BatchJob.scala similarity index 100% rename from code/Flink/flink-basis/src/main/scala/com/heibaiying/BatchJob.scala rename to code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/BatchJob.scala diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/StreamingJob.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/StreamingJob.scala similarity index 100% rename from code/Flink/flink-basis/src/main/scala/com/heibaiying/StreamingJob.scala rename to code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/StreamingJob.scala diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountBatch.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala similarity index 86% rename from code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountBatch.scala rename to code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala index a7ec2a7..ff52ae7 100644 --- a/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountBatch.scala +++ b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala @@ -6,7 +6,7 @@ object WordCountBatch { def main(args: Array[String]): Unit = { val benv = ExecutionEnvironment.getExecutionEnvironment - val text = benv.readTextFile("D:\\BigData-Notes\\code\\Flink\\flink-basis\\src\\main\\resources\\wordcount.txt") + val text = benv.readTextFile("D:\\BigData-Notes\\code\\Flink\\flink-basis-scala\\src\\main\\resources\\wordcount.txt") val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1) counts.print() } diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountStreaming.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountStreaming.scala similarity index 100% rename from code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountStreaming.scala rename to code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountStreaming.scala diff --git a/pictures/flink-lib.png b/pictures/flink-lib.png index d43cd34..9d76b9d 100644 Binary files a/pictures/flink-lib.png and b/pictures/flink-lib.png differ