diff --git a/code/Flink/flink-basis/pom.xml b/code/Flink/flink-basis/pom.xml new file mode 100644 index 0000000..f4c4381 --- /dev/null +++ b/code/Flink/flink-basis/pom.xml @@ -0,0 +1,273 @@ + + + 4.0.0 + + com.heibaiying + flink-basis + 1.0 + jar + + Flink Quickstart Job + http://www.myorganization.org + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + UTF-8 + 1.9.0 + 2.11 + 2.11.12 + + + + + + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + + + + + org.scala-lang + scala-library + ${scala.version} + + + + + + + + + + org.slf4j + slf4j-log4j12 + 1.7.7 + runtime + + + log4j + log4j + 1.2.17 + runtime + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.heibaiying.StreamingJob + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.8 + 1.8 + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + + compile + testCompile + + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + **/*.scala + **/*.java + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + + + + + + + add-dependencies-for-IDEA + + + + idea.version + + + + + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + compile + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + compile + + + org.scala-lang + scala-library + ${scala.version} + compile + + + + + + diff --git a/code/Flink/flink-basis/src/main/resources/log4j.properties b/code/Flink/flink-basis/src/main/resources/log4j.properties new file mode 100644 index 0000000..da32ea0 --- /dev/null +++ b/code/Flink/flink-basis/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/code/Flink/flink-basis/src/main/resources/wordcount.txt b/code/Flink/flink-basis/src/main/resources/wordcount.txt new file mode 100644 index 0000000..5c0e97b --- /dev/null +++ b/code/Flink/flink-basis/src/main/resources/wordcount.txt @@ -0,0 +1,4 @@ +a,a,a,a,a +b,b,b +c,c +d,d diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/BatchJob.scala b/code/Flink/flink-basis/src/main/scala/com/heibaiying/BatchJob.scala new file mode 100644 index 0000000..6652d98 --- /dev/null +++ b/code/Flink/flink-basis/src/main/scala/com/heibaiying/BatchJob.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.heibaiying + +import org.apache.flink.api.scala._ + +/** + * Skeleton for a Flink Batch Job. + * + * For a tutorial how to write a Flink batch application, check the + * tutorials and examples on the Flink Website. + * + * To package your application into a JAR file for execution, + * change the main class in the POM.xml file to this class (simply search for 'mainClass') + * and run 'mvn clean package' on the command line. + */ +object BatchJob { + + def main(args: Array[String]) { + // set up the batch execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + /* + * Here, you can start creating your execution plan for Flink. + * + * Start with getting some data from the environment, like + * env.readTextFile(textPath); + * + * then, transform the resulting DataSet[String] using operations + * like + * .filter() + * .flatMap() + * .join() + * .group() + * + * and many more. + * Have a look at the programming guide: + * + * http://flink.apache.org/docs/latest/apis/batch/index.html + * + * and the examples + * + * http://flink.apache.org/docs/latest/apis/batch/examples.html + * + */ + + // execute program + env.execute("Flink Batch Scala API Skeleton") + } +} diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/StreamingJob.scala b/code/Flink/flink-basis/src/main/scala/com/heibaiying/StreamingJob.scala new file mode 100644 index 0000000..2325a6c --- /dev/null +++ b/code/Flink/flink-basis/src/main/scala/com/heibaiying/StreamingJob.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.heibaiying + +import org.apache.flink.streaming.api.scala._ + +/** + * Skeleton for a Flink Streaming Job. + * + * For a tutorial how to write a Flink streaming application, check the + * tutorials and examples on the Flink Website. + * + * To package your application into a JAR file for execution, run + * 'mvn clean package' on the command line. + * + * If you change the name of the main class (with the public static void main(String[] args)) + * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + */ +object StreamingJob { + def main(args: Array[String]) { + // set up the streaming execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + + /* + * Here, you can start creating your execution plan for Flink. + * + * Start with getting some data from the environment, like + * env.readTextFile(textPath); + * + * then, transform the resulting DataStream[String] using operations + * like + * .filter() + * .flatMap() + * .join() + * .group() + * + * and many more. + * Have a look at the programming guide: + * + * http://flink.apache.org/docs/latest/apis/streaming/index.html + * + */ + + // execute program + env.execute("Flink Streaming Scala API Skeleton") + } +} diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountBatch.scala b/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountBatch.scala new file mode 100644 index 0000000..a7ec2a7 --- /dev/null +++ b/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountBatch.scala @@ -0,0 +1,13 @@ +package com.heibaiying + +import org.apache.flink.api.scala._ + +object WordCountBatch { + + def main(args: Array[String]): Unit = { + val benv = ExecutionEnvironment.getExecutionEnvironment + val text = benv.readTextFile("D:\\BigData-Notes\\code\\Flink\\flink-basis\\src\\main\\resources\\wordcount.txt") + val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1) + counts.print() + } +} diff --git a/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountStreaming.scala b/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountStreaming.scala new file mode 100644 index 0000000..daad9ce --- /dev/null +++ b/code/Flink/flink-basis/src/main/scala/com/heibaiying/WordCountStreaming.scala @@ -0,0 +1,25 @@ +package com.heibaiying + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time + + +object WordCountStreaming { + + def main(args: Array[String]): Unit = { + + val senv = StreamExecutionEnvironment.getExecutionEnvironment + + val text: DataStream[String] = senv.socketTextStream("192.168.200.229", 9999, '\n') + val windowCounts = text.flatMap { w => w.split(",") }.map { w => WordWithCount(w, 1) }.keyBy("word") + .timeWindow(Time.seconds(5)).sum("count") + + windowCounts.print().setParallelism(1) + + senv.execute("Streaming WordCount") + + } + + case class WordWithCount(word: String, count: Long) + +} diff --git a/notes/Flink开发环境搭建.md b/notes/Flink开发环境搭建.md new file mode 100644 index 0000000..66aa8b2 --- /dev/null +++ b/notes/Flink开发环境搭建.md @@ -0,0 +1,130 @@ +# Flink 开发环境搭建 + +## 一、安装 Scala 插件 + +Flink 分别提供了基于 Java 语言和 Scala 语言的 API ,如果想要使用 Scala 语言来开发 Flink 程序,可以通过在 IDEA 中安装 Scala 插件来提供语法提示,代码高亮等功能。打开 IDEA , 依次点击 `File => settings => plugins` 打开插件安装页面,搜索 Scala 插件并进行安装,安装完成后,重启 IDEA 即可生效。 + +![scala-plugin](D:\BigData-Notes\pictures\scala-plugin.png) + +## 二、Flink 项目初始化 + +### 2.1 官方项目初始化方式 + +Flink 官方支持使用 Maven 和 Gradle 两种构建工具来构建基于 Java 语言的 Flink 项目,支持使用 SBT 和 Maven 两种构建工具来构建基于 Scala 语言的 Flink 项目。 这里以 Maven 为例进行说明,因为其可以同时支持 Java 语言和 Scala 语言项目的构建。 + +需要注意的是 Flink 1.9 只支持 Maven 3.0.4 以上的版本,所以需要预先进行安装。安装完成后,可以通过以下两种方式来构建项目: + +**1. 直接基于 Maven Archetype 构建** + +直接使用下面的 maven 语句来进行构建,然后根据交互信息的提示,依次输入 groupId , artifactId 以及包名等信息后等待初始化的完成: + +```bash +$ mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-quickstart-java \ + -DarchetypeVersion=1.9.0 +``` + +> 注:如果想要创建基于 Scala 语言的项目,只需要将 flink-quickstart-java 换成 flink-quickstart-scala 即可,后文亦同。 + +**2. 使用官方脚本快速构建** + +为了更方便的初始化项目,官方提供了快速构建脚本,可以通过以下命令来直接进行调用: + +```shell +$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0 +``` + +该方式其实也是通过执行 maven archetype 命令来进行初始化,其脚本内容如下: + +```shell +PACKAGE=quickstart + +mvn archetype:generate \ + -DarchetypeGroupId=org.apache.flink \ + -DarchetypeArtifactId=flink-quickstart-java \ + -DarchetypeVersion=${1:-1.8.0} \ + -DgroupId=org.myorg.quickstart \ + -DartifactId=$PACKAGE \ + -Dversion=0.1 \ + -Dpackage=org.myorg.quickstart \ + -DinteractiveMode=false +``` + +可以看到相比于第一种方式,该种方式只是直接指定好了 groupId ,artifactId ,version 等信息而已。 + +### 2.2 使用 IDEA 快速构建 + +如果你使用的是开发工具是 IDEA ,可以直接在项目创建页面选择 Maven Flink Archetype 进行项目初始化: + +![flink-maven](D:\BigData-Notes\pictures\flink-maven.png) + +如果你的 IDEA 没有上述 Archetype, 可以通过点击右上角的 `ADD ARCHETYPE` ,来进行添加,依次填入所需信息,这些信息都可以从上述的 `archetype:generate ` 语句中获取。点击 `OK` 保存后,该 Archetype 就会一直存在于你的 IDEA 中,之后每次创建项目时,只需要直接选择该 Archetype 即可。 + +![flink-maven-new](D:\BigData-Notes\pictures\flink-maven-new.png) + +选中 Flink Archetype ,然后点击 `NEXT` 按钮,之后的所有步骤都和正常的 Maven 工程相同。创建完成后的项目结构如下: + +![flink-basis-project](D:\BigData-Notes\pictures\flink-basis-project.png) + +## 三、词频统计案例 + +### 3.1 案例代码 + +创建完成后,可以先书写一个简单的词频统计的案例来尝试运行 Flink 项目,这里以 Scala 语言为例,代码如下: + +```scala +package com.heibaiying + +import org.apache.flink.api.scala._ + +object WordCountBatch { + + def main(args: Array[String]): Unit = { + val benv = ExecutionEnvironment.getExecutionEnvironment + val text = benv.readTextFile("D:\\wordcount.txt") + val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1) + counts.print() + } +} +``` + +其中 `wordcount.txt` 中的内容如下: + +```shell +a,a,a,a,a +b,b,b +c,c +d,d +``` + +本机不需要安装其他任何的 Flink 环境,直接运行 Main 方法即可,结果如下: + +![flink-word-count](D:\BigData-Notes\pictures\flink-word-count.png) + +### 3.1 常见异常 + +这里常见的一个启动异常是如下,之所以出现这样的情况,是因为 Maven 提供的 Flink Archetype 默认是以生产环境为标准的,因为 Flink 的安装包中默认就有 Flink 相关的 JAR 包,所以在 Maven 中这些 JAR 都被标识为 `provided` , 只需要去掉该标签即可。 + +```shell +Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeinfo.TypeInformation +``` +## 四、使用 Scala 命令行 + + https://flink.apache.org/downloads.html + +start-scala-shell.sh + +```shell +[root@hadoop001 bin]# ./start-scala-shell.sh +错误: 找不到或无法加载主类 org.apache.flink.api.scala.FlinkShell +``` + + + + + + + + + diff --git a/pictures/flink-basis-project.png b/pictures/flink-basis-project.png new file mode 100644 index 0000000..819e221 Binary files /dev/null and b/pictures/flink-basis-project.png differ diff --git a/pictures/flink-maven-new.png b/pictures/flink-maven-new.png new file mode 100644 index 0000000..8bb79ac Binary files /dev/null and b/pictures/flink-maven-new.png differ diff --git a/pictures/flink-maven.png b/pictures/flink-maven.png new file mode 100644 index 0000000..960c93b Binary files /dev/null and b/pictures/flink-maven.png differ diff --git a/pictures/flink-scala-shell.png b/pictures/flink-scala-shell.png new file mode 100644 index 0000000..6a39bd0 Binary files /dev/null and b/pictures/flink-scala-shell.png differ diff --git a/pictures/flink-word-count.png b/pictures/flink-word-count.png new file mode 100644 index 0000000..95d1d1a Binary files /dev/null and b/pictures/flink-word-count.png differ diff --git a/pictures/scala-plugin.png b/pictures/scala-plugin.png new file mode 100644 index 0000000..1d9e229 Binary files /dev/null and b/pictures/scala-plugin.png differ