diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java index 9977be2..ca6d411 100644 --- a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java @@ -20,7 +20,7 @@ public class KafkaStreamingJob { // 1.指定Kafka的相关配置属性 Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", "192.168.200.0:9092"); + properties.setProperty("bootstrap.servers", "192.168.0.229:9092"); // 2.接收Kafka上的数据 DataStream stream = env diff --git a/code/Flink/flink-time-watermark/pom.xml b/code/Flink/flink-time-watermark/pom.xml deleted file mode 100644 index 6255450..0000000 --- a/code/Flink/flink-time-watermark/pom.xml +++ /dev/null @@ -1,232 +0,0 @@ - - - 4.0.0 - - com.heibaiying - flink-time-watermark - 1.0 - jar - - Flink Quickstart Job - http://www.myorganization.org - - - UTF-8 - 1.9.0 - 1.8 - 2.11 - ${java.version} - ${java.version} - - - - - apache.snapshots - Apache Development Snapshot Repository - https://repository.apache.org/content/repositories/snapshots/ - - false - - - true - - - - - - - - - org.apache.flink - flink-java - ${flink.version} - provided - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - - - - - - - - - - org.slf4j - slf4j-log4j12 - 1.7.7 - runtime - - - log4j - log4j - 1.2.17 - runtime - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - ${java.version} - ${java.version} - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.0.0 - - - - package - - shade - - - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - com.heibaiying.WaterMarkJob - - - - - - - - - - - - - - org.eclipse.m2e - lifecycle-mapping - 1.0.0 - - - - - - org.apache.maven.plugins - maven-shade-plugin - [3.0.0,) - - shade - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - [3.1,) - - testCompile - compile - - - - - - - - - - - - - - - - - - - - add-dependencies-for-IDEA - - - - idea.version - - - - - - org.apache.flink - flink-java - ${flink.version} - compile - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - compile - - - - - - diff --git a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java deleted file mode 100644 index 81fd6e1..0000000 --- a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PeriodicWatermarksJob.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.heibaiying; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; - -public class PeriodicWatermarksJob { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // 设置并行度为1 - env.setParallelism(1); - // 设置以事件时间为基准 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - DataStreamSource streamSource = env.socketTextStream("192.168.200.229", 8888, "\n", 3); - streamSource.map(new MapFunction>() { - @Override - public Tuple3 map(String value) throws Exception { - String[] split = value.split(","); - return new Tuple3<>(Long.valueOf(split[0]), split[1], 1L); - } - }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) - .keyBy(1).timeWindow(Time.seconds(3)).sum(2).print(); - env.execute(); - - } -} - - -class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks> { - - private final long maxOutOfOrderness = 3000L; - private long currentMaxTimestamp = 0L; - - @Override - public long extractTimestamp(Tuple3 element, long previousElementTimestamp) { - long timestamp = element.f0; - currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); - return timestamp; - } - - @Override - public Watermark getCurrentWatermark() { - return new Watermark(currentMaxTimestamp - maxOutOfOrderness); - } -} diff --git a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java deleted file mode 100644 index 605b59e..0000000 --- a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/PunctuatedWatermarksJob.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.heibaiying; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; - -public class PunctuatedWatermarksJob { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - // 设置以事件时间为基准 - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - DataStreamSource streamSource = env.socketTextStream("192.168.200.229", 8888, "\n", 3); - streamSource.map(new MapFunction>() { - @Override - public Tuple3 map(String value) throws Exception { - String[] split = value.split(","); - return new Tuple3<>(Long.valueOf(split[0]), split[1], 1L); - } - }).assignTimestampsAndWatermarks(new PunctuatedAssigner()) - .keyBy(1).timeWindow(Time.seconds(3)).sum(2).print(); - env.execute(); - - } -} - -class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks> { - - @Override - public long extractTimestamp(Tuple3 element, long previousElementTimestamp) { - return element.f0; - } - - @Override - public Watermark checkAndGetNextWatermark(Tuple3 lastElement, long extractedTimestamp) { - return new Watermark(extractedTimestamp); - } -} - diff --git a/code/Flink/flink-time-watermark/src/main/resources/log4j.properties b/code/Flink/flink-time-watermark/src/main/resources/log4j.properties deleted file mode 100644 index da32ea0..0000000 --- a/code/Flink/flink-time-watermark/src/main/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/code/Flink/flink-time-watermark/src/main/resources/sample.txt b/code/Flink/flink-time-watermark/src/main/resources/sample.txt deleted file mode 100644 index afe1887..0000000 --- a/code/Flink/flink-time-watermark/src/main/resources/sample.txt +++ /dev/null @@ -1,18 +0,0 @@ -1572501901000,hadoop -1572501902000,hadoop -1572501903000,hadoop -1572501904000,flink -1572501905000,spark -1572501906000,spark -> (1572501901000,hadoop,2) -1572501907000,hive -1572501908000,hive -1572501909000,hive -> (1572501903000,hadoop,1) - (1572501905000,spark ,1) - (1572501904000,flink,1) -1572501910000,spark -1572501911000,storm -1572501912000,storm -> (1572501906000,spark,1) - (1572501907000,hive,2) -1572501915000,yarn -> (1572501911000,storm,1) - (1572501909000,hive,1) - (1572501910000,spark,1) \ No newline at end of file diff --git a/notes/Flink_Data_Source.md b/notes/Flink_Data_Source.md index 59d66bd..183f360 100644 --- a/notes/Flink_Data_Source.md +++ b/notes/Flink_Data_Source.md @@ -13,7 +13,7 @@ ## 一、内置 Data Source -Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方内置提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下: +Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下: ### 1.1 基于文件构建 @@ -29,7 +29,7 @@ env.readTextFile(filePath).print(); + **inputFormat**:数据流的输入格式。 + **filePath**:文件路径,可以是本地文件系统上的路径,也可以是 HDFS 上的文件路径。 -+ **watchType**:读取方式,它有两个可选值,分别是 `FileProcessingMode.PROCESS_ONCE` 和 `FileProcessingMode.PROCESS_CONTINUOUSLY`:前者表示对指定路径上的数据只读取一次,然后退出;后者表示对路径进行定期扫描从而可以获取到新的数据。需要注意的是如果 watchType 被设置为 `PROCESS_CONTINUOUSLY`,那么当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 *exactly-once* 语义。 ++ **watchType**:读取方式,它有两个可选值,分别是 `FileProcessingMode.PROCESS_ONCE` 和 `FileProcessingMode.PROCESS_CONTINUOUSLY`:前者表示对指定路径上的数据只读取一次,然后退出;后者表示对路径进行定期地扫描和读取。需要注意的是如果 watchType 被设置为 `PROCESS_CONTINUOUSLY`,那么当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 *exactly-once* 语义。 + **interval**:定期扫描的时间间隔。 + **typeInformation**:输入流中元素的类型。 @@ -102,7 +102,7 @@ Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流, - **hostname**:主机名; - **port**:端口号,设置为 0 时,表示端口号自动分配; - **delimiter**:用于分隔每条记录的分隔符; -- **maxRetry**:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。使用示例如下: +- **maxRetry**:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。示例如下: ```shell env.socketTextStream("192.168.0.229", 9999, "\n", 3).print(); @@ -151,7 +151,6 @@ Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not 如果你想要实现具有并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:
- ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有并行度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,所以其除了具有并行度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,closen() 。 ## 三、Streaming Connectors @@ -267,11 +266,9 @@ bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream- 在 Producer 上输入任意测试数据,之后观察程序控制台的输出:
- 程序控制台的输出如下:
- 可以看到已经成功接收并打印出相关的数据。 diff --git a/notes/Flink_Data_Transformation.md b/notes/Flink_Data_Transformation.md index c84d518..9840431 100644 --- a/notes/Flink_Data_Transformation.md +++ b/notes/Flink_Data_Transformation.md @@ -14,8 +14,10 @@ Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。它主要分为以下三类: - **DataStream Transformations**:进行数据流相关转换操作; -- **Physical partitioning**:物理分区。Flink 提供的底层 API ,允许用户在必要时可以控制数据的分区规则; -- **Task chaining and resource groups**:任务链和资源组。允许用户进行任务链和资源组相关的细粒度控制。 +- **Physical partitioning**:物理分区。Flink 提供的底层 API ,允许用户定义数据的分区规则; +- **Task chaining and resource groups**:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。 + +以下分别对其主要 API 进行介绍: ## 二、DataStream Transformations @@ -83,7 +85,7 @@ keyedStream.reduce((ReduceFunction>) (value1, value2) -> KeyBy 操作存在以下两个限制: - KeyBy 操作用于用户自定义的 POJOs 类型时,该自定义类型必须重写 hashCode 方法; -- KeyBy 操作不能用于任何数组类型。 +- KeyBy 操作不能用于数组类型。 ### 2.5 Aggregations [KeyedStream → DataStream] @@ -173,7 +175,7 @@ split.select("even").print(); ### 2.9 project [DataStream → DataStream] -project 主要用于获取 tuples 中的指定字段集,示例如下: +project 主要用于获取 tuples 中的指定字段集,示例如下: ```java DataStreamSource> streamSource = env.fromElements( @@ -188,7 +190,7 @@ streamSource.project(0,2).print(); ## 三、物理分区 -物理分区 (Physical partitioning) 是 Flink 提供的底层的 API,用于允许用户采用内置的分区规则或者自定义的分区规则来对数据进行分区,从而避免数据在某些分区上过于倾斜,常用的分区规则如下: +物理分区 (Physical partitioning) 是 Flink 提供的底层的 API,允许用户采用内置的分区规则或者自定义的分区规则来对数据进行分区,从而避免数据在某些分区上过于倾斜,常用的分区规则如下: ### 3.1 Random partitioning [DataStream → DataStream] @@ -219,7 +221,6 @@ ReScale 这个单词具有重新缩放的意义,其对应的操作也是如此
- ### 3.4 Broadcasting [DataStream → DataStream] 将数据分发到所有分区上。通常用于小数据集与大数据集进行关联的情况下,此时可以将小数据集广播到所有分区上,避免频繁的跨分区关联,通过 broadcast 方法进行实现: diff --git a/notes/Flink_Windows.md b/notes/Flink_Windows.md index 022772b..c9df8a3 100644 --- a/notes/Flink_Windows.md +++ b/notes/Flink_Windows.md @@ -24,7 +24,6 @@ Time Windows 用于以时间为维度来进行数据聚合,具体分为以下
- 这里我们以词频统计为例,给出一个具体的用例,代码如下: ```java @@ -50,7 +49,6 @@ env.execute("Flink Streaming"); - ### 2.2 Sliding Windows 滑动窗口用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下: @@ -58,7 +56,6 @@ env.execute("Flink Streaming");
- 可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下: ```java @@ -73,11 +70,10 @@ timeWindow(Time.minutes(1),Time.seconds(3))
- 具体的实现代码如下: ```java -// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经端口,此时触发统计 +// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计 window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) // 以事件时间为衡量标准 window(EventTimeSessionWindows.withGap(Time.seconds(10))) @@ -90,7 +86,6 @@ window(EventTimeSessionWindows.withGap(Time.seconds(10)))
- 这里继续以上面词频统计的案例为例,示例代码如下: ```java diff --git a/notes/Flink开发环境搭建.md b/notes/Flink开发环境搭建.md index e76e4ff..5da957a 100644 --- a/notes/Flink开发环境搭建.md +++ b/notes/Flink开发环境搭建.md @@ -17,7 +17,6 @@ Flink 分别提供了基于 Java 语言和 Scala 语言的 API ,如果想要使用 Scala 语言来开发 Flink 程序,可以通过在 IDEA 中安装 Scala 插件来提供语法提示,代码高亮等功能。打开 IDEA , 依次点击 `File => settings => plugins` 打开插件安装页面,搜索 Scala 插件并进行安装,安装完成后,重启 IDEA 即可生效。
- ## 二、Flink 项目初始化 ### 2.1 使用官方脚本构建 @@ -68,7 +67,6 @@ mvn archetype:generate \ 如果你使用的是开发工具是 IDEA ,可以直接在项目创建页面选择 Maven Flink Archetype 进行项目初始化:
- 如果你的 IDEA 没有上述 Archetype, 可以通过点击右上角的 `ADD ARCHETYPE` ,来进行添加,依次填入所需信息,这些信息都可以从上述的 `archetype:generate ` 语句中获取。点击 `OK` 保存后,该 Archetype 就会一直存在于你的 IDEA 中,之后每次创建项目时,只需要直接选择该 Archetype 即可:
@@ -81,7 +79,6 @@ mvn archetype:generate \ 创建完成后的自动生成的项目结构如下:
- 其中 BatchJob 为批处理的样例代码,源码如下: ```scala @@ -145,7 +142,6 @@ object StreamingJob { 需要特别注意的以上依赖的 `scope` 标签全部被标识为 provided ,这意味着这些依赖都不会被打入最终的 JAR 包。因为 Flink 的安装包中已经提供了这些依赖,位于其 lib 目录下,名为 `flink-dist_*.jar` ,它包含了 Flink 的所有核心类和依赖:
- `scope` 标签被标识为 provided 会导致你在 IDEA 中启动项目时会抛出 ClassNotFoundException 异常。基于这个原因,在使用 IDEA 创建项目时还自动生成了以下 profile 配置: ```xml @@ -189,7 +185,6 @@ object StreamingJob { 在 id 为 `add-dependencies-for-IDEA` 的 profile 中,所有的核心依赖都被标识为 compile,此时你可以无需改动任何代码,只需要在 IDEA 的 Maven 面板中勾选该 profile,即可直接在 IDEA 中运行 Flink 项目:
- ## 四、词频统计案例 项目创建完成后,可以先书写一个简单的词频统计的案例来尝试运行 Flink 项目,以下以 Scala 语言为例,分别介绍流处理程序和批处理程序的编程示例: @@ -226,7 +221,6 @@ d,d 本机不需要配置其他任何的 Flink 环境,直接运行 Main 方法即可,结果如下:
- ### 4.2 流处理示例 ```scala @@ -271,7 +265,6 @@ https://flink.apache.org/downloads.html Flink 大多数版本都提供有 Scala 2.11 和 Scala 2.12 两个版本的安装包可供下载:
- 下载完成后进行解压即可,Scala Shell 位于安装目录的 bin 目录下,直接使用以下命令即可以本地模式启动: ```shell @@ -281,8 +274,7 @@ Flink 大多数版本都提供有 Scala 2.11 和 Scala 2.12 两个版本的安 命令行启动完成后,其已经提供了批处理 (benv 和 btenv)和流处理(senv 和 stenv)的运行环境,可以直接运行 Scala Flink 程序,示例如下:
- -最后说明一个常见的异常:这里我使用的 Flink 版本为 1.9.1,启动时会抛出如下异常。这里因为按照官方的说明,目前所有 Scala 2.12 版本的安装包暂时都不支持 Scala Shell,所以如果想要使用 Scala Shell,只能选择 Scala 2.11 版本的安装包。 +最后解释一个常见的异常:这里我使用的 Flink 版本为 1.9.1,启动时会抛出如下异常。这里因为按照官方的说明,目前所有 Scala 2.12 版本的安装包暂时都不支持 Scala Shell,所以如果想要使用 Scala Shell,只能选择 Scala 2.11 版本的安装包。 ```shell [root@hadoop001 bin]# ./start-scala-shell.sh local diff --git a/notes/Flink核心概念综述.md b/notes/Flink核心概念综述.md index cb722f5..f5cecdf 100644 --- a/notes/Flink核心概念综述.md +++ b/notes/Flink核心概念综述.md @@ -61,34 +61,28 @@ Stateful Stream Processing 是最低级别的抽象,它通过 Process Function 按照上面的介绍,Flink 核心架构的第二层是 Runtime 层, 该层采用标准的 Master - Slave 结构, 其中,Master 部分又包含了三个核心组件:Dispatcher、ResourceManager 和 JobManager,而 Slave 则主要是 TaskManager 进程。它们的功能分别如下: -- **JobManagers** (也称为 *masters*) : 整个作业 (Job) 的主要管理者,负责调度任务 (tasks)、协调检查点 (checkpoints) 、协调故障恢复等。每个 Job 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 *leader*,其余的则处于 *standby* 状态。 -- **TaskManagers** (也称为 *workers*) :负责子任务 (subtasks) 的执行。每个 Job 至少会有一个 TaskManager。 -- **Dispatcher**:负责接收客户端的作业,并启动 JobManager。 -- **ResourceManager** :负责集群资源的协调和管理。 +- **JobManagers** (也称为 *masters*) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 *leader*,其余的则处于 *standby* 状态。 +- **TaskManagers** (也称为 *workers*) : TaskManagers 负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。 +- **Dispatcher**:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。 +- **ResourceManager** :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。 -以 Standalone 模式为例,它们之间的运行流程如下: +![flink-application-submission](D:\BigData-Notes\pictures\flink-application-submission.png)染病 -![flink-standalone-cluster](D:\BigData-Notes\pictures\flink-standalone-cluster.jpg) -+ 用户通过 Client 将作业 ( Job) 提交给 Master 时,此时需要先经过 Dispatcher; -+ 当 Dispatcher 收到客户端的请求之后,会生成一个 JobManager,接着 JobManager 进程向 ResourceManager 申请资源来启动 TaskManager; -+ TaskManager 启动之后,它需要将自己注册到 ResourceManager 上。注册完成后, JobManager 再将具体的 Task 任务分发给这个 TaskManager 去执行。 ### 4.2 Task & SubTask -在上面我们提到 JobManagers 负责管理的是 Task ,而 TaskManagers 负责执行的则是 SubTask,这里解释一下任务 Task 和子任务 SubTask 两者间的关系: +上面我们提到:TaskManagers 实际执行的是 SubTask,而不是 Task,这里解释一下两者的区别: 在执行分布式计算时,Flink 将可以链接的操作 (operators) 链接到一起,这就是 Task。之所以这样做, 是为了减少线程间切换和缓冲而导致的开销,在降低延迟的同时可以提高整体的吞吐量。 但不是所有的 operator 都可以被链接,如下 keyBy 等操作会导致网络 shuffle 和重分区,因此其就不能被链接,只能被单独作为一个 Task。 简单来说,一个 Task 就是一个可以链接的最小的操作链 (Operator Chains) 。如下图,source 和 map 算子被链接到一块,因此整个作业就只有三个 Task: ![flink-task-subtask](D:\BigData-Notes\pictures\flink-task-subtask.png) -解释完 Task ,我们在解释一下什么是 SubTask,其准确的翻译是: *A subtask is one parallel slice of a task*,即一个 Task 可以按照其并行度拆分为多个 SubTask。如上图,source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask,每个 SubTask 都是一个单独的线程。 - -Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行。想要成功执行这些任务,Taskmanagers 还必须要具备运行这些 SubTask 所需要的计算资源和内存资源。 +解释完 Task ,我们在解释一下什么是 SubTask,其准确的翻译是: *A subtask is one parallel slice of a task*,即一个 Task 可以按照其并行度拆分为多个 SubTask。如上图,source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask。Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行,每个 SubTask 都是一个单独的线程。 ### 4.3 资源管理 -ResourceManager 对资源的管理是以 Slot 为单位的,Slot 是一组固定大小的资源的合集。 在上面的过程中,JobManager 进程向 ResourceManager 申请资源来启动 TaskManager,申请的就是 Slot 资源,此时上面 5 个 SubTasks 与 Slots 的对应情况则可能如下: +理解了 SubTasks ,我们再来看看其与 Slots 的对应情况。一种可能的分配情况如下: ![flink-tasks-slots](D:\BigData-Notes\pictures\flink-tasks-slots.png) @@ -112,7 +106,7 @@ Flink 的所有组件都基于 Actor System 来进行通讯。Actor system是多 ## 五、Flink 的优点 -最后来介绍一下 Flink 的优点: +最后基于上面的介绍,来总结一下 Flink 的优点: + Flink 是基于事件驱动 (Event-driven) 的应用,能够同时支持流处理和批处理; + 基于内存的计算,能够保证高吞吐和低延迟,具有优越的性能表现; @@ -130,7 +124,6 @@ Flink 的所有组件都基于 Actor System 来进行通讯。Actor system是多 + [Dataflow Programming Model](https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html) + [Distributed Runtime Environment](https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html) + [Component Stack](https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/components.html) -+ [Flink on Yarn/K8s 原理剖析及实践]( https://www.infoq.cn/article/UFeFrdqSlqI3HyRHbPNm ) + Fabian Hueske , Vasiliki Kalavri . 《Stream Processing with Apache Flink》. O'Reilly Media . 2019-4-30 diff --git a/notes/Flink状态管理.md b/notes/Flink状态管理.md index 1c9174c..546a53a 100644 --- a/notes/Flink状态管理.md +++ b/notes/Flink状态管理.md @@ -121,7 +121,7 @@ descriptor.enableTimeToLive(ttlConfig); - **ListState**:存储列表类型的状态。 - **UnionListState**:存储列表类型的状态,与 ListState 的区别在于:如果并行度发生变化,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。 -- **BroadcastState**:用于广播算子状态。 +- **BroadcastState**:用于广播的算子状态。 这里我们继续沿用上面的例子,假设此时我们不需要区分监控数据的类型,只要有监控数据超过阈值并达到指定的次数后,就进行报警,代码如下: diff --git a/notes/installation/Flink_Standalone_Cluster.md b/notes/installation/Flink_Standalone_Cluster.md new file mode 100644 index 0000000..17e8153 --- /dev/null +++ b/notes/installation/Flink_Standalone_Cluster.md @@ -0,0 +1,239 @@ +# Flink Standalone Cluster + +## 一、部署模式 + +Flink 支持使用多种部署模式来满足不同规模应用的需求,常见的有单机模式,Standalone Cluster 模式,同时 Flink 也支持部署在其他第三方平台上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介绍其单机模式和 Standalone Cluster 模式的部署。 + +## 二、单机模式 + +单机模式是一种开箱即用的模式,可以在单台服务器上运行,适用于日常的开发和调试。具体操作步骤如下: + +### 2.1 安装部署 + +**1. 前置条件** + +Flink 的运行依赖 JAVA 环境,故需要预先安装好 JDK,具体步骤可以参考:[Linux 环境下 JDK 安装](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Linux下JDK安装.md) + +**2. 下载 & 解压 & 运行** + +Flink 所有版本的安装包可以直接从其[官网](https://flink.apache.org/downloads.html)进行下载,这里我下载的 Flink 的版本为 `1.9.1` ,要求的 JDK 版本为 `1.8.x +`。 下载后解压到指定目录: + +```shell +tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C /usr/app +``` + +不需要进行任何配置,直接使用以下命令就可以启动单机版本的 Flink: + +```shell +bin/start-cluster.sh +``` + +**3. WEB UI 界面** + +Flink 提供了 WEB 界面用于直观的管理 Flink 集群,访问端口为 `8081`: + +![flink-dashboard](D:\BigData-Notes\pictures\flink-dashboard.png) + +Flink 的 WEB UI 界面支持大多数常用功能,如提交作业,取消作业,查看各个节点运行情况,查看作业执行情况等,大家可以在部署完成后,进入该页面进行详细的浏览。 + +### 2.2 作业提交 + +启动后可以运行安装包中自带的词频统计案例,具体步骤如下: + +**1. 开启端口** + +```shell +nc -lk 9999 +``` + +**2. 提交作业** + +```shell +bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999 +``` + +该 JAR 包的源码可以在 Flink 官方的 GitHub 仓库中找到,地址为 :[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java) ,可选传参有 hostname, port,对应的词频数据需要使用空格进行分割。 + +**3. 输入测试数据** + +```shell +a a b b c c c a e +``` + +**4. 查看控制台输出** + +可以通过 WEB UI 的控制台查看作业统运行情况: + +![flink-socket-wordcount](D:\BigData-Notes\pictures\flink-socket-wordcount.png) + +也可以通过 WEB 控制台查看到统计结果: + +![flink-socket-wordcount-stdout](D:\BigData-Notes\pictures\flink-socket-wordcount-stdout.png) + +### 2.3 停止作业 + +可以直接在 WEB 界面上点击对应作业的 `Cancel Job` 按钮进行取消,也可以使用命令行进行取消。使用命令行进行取消时,需要先获取到作业的 JobId,可以使用 `flink list` 命令查看,输出如下: + +```shell +[root@hadoop001 flink-1.9.1]# ./bin/flink list +Waiting for response... +------------------ Running/Restarting Jobs ------------------- +05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING) +-------------------------------------------------------------- +No scheduled jobs. +``` + +获取到 JobId 后,就可以使用 `flink cancel` 命令取消作业: + +```shell +bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc +``` + +### 2.4 停止 Flink + +命令如下: + +```shell +bin/stop-cluster.sh +``` + + + +## 二、Standalone Cluster + +Standalone Cluster 模式是 Flink 自带的一种集群模式,具体配置步骤如下: + +### 2.1 前置条件 + +使用该模式前,需要确保所有服务器间都已经配置好 SSH 免密登录服务。这里我以三台服务器为例,主机名分别为 hadoop001,hadoop002,hadoop003 , 其中 hadoop001 为 master 节点,其余两台为 slave 节点,搭建步骤如下: + +### 2.2 搭建步骤 + +修改 `conf/flink-conf.yaml` 中 jobmanager 节点的通讯地址为 hadoop001: + +```yaml +jobmanager.rpc.address: hadoop001 +``` + +修改 `conf/slaves` 配置文件,将 hadoop002 和 hadoop003 配置为 slave 节点: + +```shell +hadoop002 +hadoop003 +``` + +将配置好的 Flink 安装包分发到其他两台服务器上: + +```shell + scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app + scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app +``` + +在 hadoop001 上使用和单机模式相同的命令来启动集群: + +```shell +bin/start-cluster.sh +``` + +此时控制台输出如下: + +![flink-start-cluster-shell](D:\BigData-Notes\pictures\flink-start-cluster-shell.png) + +启动完成后可以使用 `Jps` 命令或者通过 WEB 界面来查看是否启动成功。 + +### 2.3 可选配置 + +除了上面介绍的 *jobmanager.rpc.address* 是必选配置外,Flink h还支持使用其他可选参数来优化集群性能,主要如下: + +- **jobmanager.heap.size**:JobManager 的 JVM 堆内存大小,默认为 1024m 。 +- **taskmanager.heap.size**::Taskmanager 的 JVM 堆内存大小,默认为 1024m 。 +- **taskmanager.numberOfTaskSlots**:Taskmanager 上 slots 的数量,通常设置为 CPU 核心的数量,或其一半。 +- **parallelism.default:** 任务默认的并行度。 +- **io.tmp.dirs:**存储临时文件的路径,如果没有配置,则默认采用服务器的临时目录,如 LInux 的 `/tmp` 目录。 + +更多配置可以参考 Flink 的官方手册:[Configuration](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html) + +## 三、Standalone Cluster HA + +上面我们配置的 Standalone 集群实际上只有一个 JobManager,此时是存在单点故障的,所以官方提供了 Standalone Cluster HA 模式来实现集群高可用。 + +### 3.1 前置条件 + +在 Standalone Cluster HA 模式下,集群可以由多个 JobManager,但只有一个处于 active 状态,其余的则处于备用状态,Flink 使用 ZooKeeper 来选举出 Active JobManager,并依赖其来提供一致性协调服务,所以需要预先安装 ZooKeeper 。 + +另外在高可用模式下,还需要使用分布式文件系统来持久化存储 JobManager 的元数据,最常用的就是 HDFS,所以 Hadoop 也需要预先安装。关于 Hadoop 集群和 ZooKeeper 集群的搭建可以参考: + ++ [Hadoop 集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Hadoop集群环境搭建.md) ++ [Zookeeper 单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md) + +### 3.2 搭建步骤 + +修改 `conf/flink-conf.yaml` 文件,增加如下配置: + +```yaml +# 配置使用zookeeper来开启高可用模式 +high-availability: zookeeper +# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址 +high-availability.zookeeper.quorum: hadoop003:2181 +# 在zookeeper上存储flink集群元信息的路径 +high-availability.zookeeper.path.root: /flink +# 集群id +high-availability.cluster-id: /standalone_cluster_one +# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息 +high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery +``` + +修改 `conf/masters` 文件,将 hadoop001 和 hadoop002 都配置为 master 节点: + +```shell +hadoop001:8081 +hadoop002:8081 +``` + +确保 Hadoop 和 ZooKeeper 已经启动后,使用以下命令来启动集群: + +```shell +bin/start-cluster.sh +``` + +此时输出如下: + +![flink-standalone-cluster-ha](D:\BigData-Notes\pictures\flink-standalone-cluster-ha.png) + +可以看到集群已经以 HA 的模式启动,此时还需要在各个节点上使用 `jps` 命令来查看进程是否启动成功,正常情况如下: + +![flink-standalone-cluster-jps](D:\BigData-Notes\pictures\flink-standalone-cluster-jps.png) + +只有 hadoop001 和 hadoop002 的 JobManager 进程,hadoop002 和 hadoop003 上的 TaskManager 进程都已经完全启动,才表示 Standalone Cluster HA 模式搭建成功。 + +### 3.3 常见异常 + +如果进程没有启动,可以通过查看 `log` 目录下的日志来定位错误,常见的一个错误如下: + +```shell +2019-11-05 09:18:35,877 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint +- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics +java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir) +....... +Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file +system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no +Hadoop file system to support this scheme could be loaded. +..... +Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in +the classpath/dependencies. +...... +``` + +可以看到是因为在 classpath 目录下找不到 Hadoop 的相关依赖,此时需要检查是否在环境变量中配置了 Hadoop 的安装路径,如果路径已经配置但仍然存在上面的问题,可以从 [Flink 官网](https://flink.apache.org/downloads.html)下载对应版本的 Hadoop 组件包: + +![flink-optional-components](D:\BigData-Notes\pictures\flink-optional-components.png) + +下载完成后,将该 JAR 包上传至**所有** Flink 安装目录的 `lib` 目录即可。 + + + +## 参考资料 + ++ [Standalone Cluster](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/cluster_setup.html#standalone-cluster) ++ [JobManager High Availability (HA)](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html) + diff --git a/pictures/flink-application-submission.png b/pictures/flink-application-submission.png new file mode 100644 index 0000000..a72fa84 Binary files /dev/null and b/pictures/flink-application-submission.png differ diff --git a/pictures/flink-dashboard.png b/pictures/flink-dashboard.png new file mode 100644 index 0000000..948d43f Binary files /dev/null and b/pictures/flink-dashboard.png differ diff --git a/pictures/flink-optional-components.png b/pictures/flink-optional-components.png new file mode 100644 index 0000000..0127b49 Binary files /dev/null and b/pictures/flink-optional-components.png differ diff --git a/pictures/flink-socket-wordcount-stdout.png b/pictures/flink-socket-wordcount-stdout.png new file mode 100644 index 0000000..5d02d95 Binary files /dev/null and b/pictures/flink-socket-wordcount-stdout.png differ diff --git a/pictures/flink-socket-wordcount.png b/pictures/flink-socket-wordcount.png new file mode 100644 index 0000000..50c6964 Binary files /dev/null and b/pictures/flink-socket-wordcount.png differ diff --git a/pictures/flink-standalone-cluster-ha.png b/pictures/flink-standalone-cluster-ha.png new file mode 100644 index 0000000..7397a07 Binary files /dev/null and b/pictures/flink-standalone-cluster-ha.png differ diff --git a/pictures/flink-standalone-cluster-jps.png b/pictures/flink-standalone-cluster-jps.png new file mode 100644 index 0000000..a127360 Binary files /dev/null and b/pictures/flink-standalone-cluster-jps.png differ diff --git a/pictures/flink-start-cluster-shell.png b/pictures/flink-start-cluster-shell.png new file mode 100644 index 0000000..ed551b8 Binary files /dev/null and b/pictures/flink-start-cluster-shell.png differ diff --git a/pictures/flink-yarn-session.png b/pictures/flink-yarn-session.png new file mode 100644 index 0000000..9565650 Binary files /dev/null and b/pictures/flink-yarn-session.png differ