diff --git a/code/Flink/flink-basis-java/src/main/java/com/heibaiying/BatchJob.java b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/BatchJob.java deleted file mode 100644 index d95a4bb..0000000 --- a/code/Flink/flink-basis-java/src/main/java/com/heibaiying/BatchJob.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.heibaiying; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.operators.FlatMapOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -import java.util.Arrays; -import java.util.List; - -/** - * Skeleton for a Flink Batch Job. - * - *

For a tutorial how to write a Flink batch application, check the - * tutorials and examples on the Flink Website. - * - *

To package your application into a JAR file for execution, - * change the main class in the POM.xml file to this class (simply search for 'mainClass') - * and run 'mvn clean package' on the command line. - */ -public class BatchJob { - - public static void main(String[] args) throws Exception { - - // set up the batch execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - env.execute("Flink Batch Java API Skeleton"); - } -} diff --git a/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java index 5d248f7..a4c688f 100644 --- a/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java +++ b/code/Flink/flink-basis-java/src/main/java/com/heibaiying/StreamingJob.java @@ -1,56 +1,18 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.heibaiying; -import org.apache.flink.api.common.functions.*; import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.datastream.*; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.util.Collector; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Skeleton for a Flink Streaming Job. - * - *

For a tutorial how to write a Flink streaming application, check the - * tutorials and examples on the Flink Website. - * - *

To package your application into a JAR file for execution, run - * 'mvn clean package' on the command line. - * - *

If you change the name of the main class (with the public static void main(String[] args)) - * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). - */ public class StreamingJob { - public static void main(String[] args) throws Exception { - // set up the streaming execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + private static final String ROOT_PATH = "D:\\BigData-Notes\\code\\Flink\\flink-basis-java\\src\\main\\resources\\"; + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource streamSource = env.readTextFile(ROOT_PATH + "log4j.properties"); + streamSource.writeAsText(ROOT_PATH + "out").setParallelism(1); env.execute(); } diff --git a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/BatchJob.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/BatchJob.scala deleted file mode 100644 index 6652d98..0000000 --- a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/BatchJob.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.heibaiying - -import org.apache.flink.api.scala._ - -/** - * Skeleton for a Flink Batch Job. - * - * For a tutorial how to write a Flink batch application, check the - * tutorials and examples on the Flink Website. - * - * To package your application into a JAR file for execution, - * change the main class in the POM.xml file to this class (simply search for 'mainClass') - * and run 'mvn clean package' on the command line. - */ -object BatchJob { - - def main(args: Array[String]) { - // set up the batch execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - /* - * Here, you can start creating your execution plan for Flink. - * - * Start with getting some data from the environment, like - * env.readTextFile(textPath); - * - * then, transform the resulting DataSet[String] using operations - * like - * .filter() - * .flatMap() - * .join() - * .group() - * - * and many more. - * Have a look at the programming guide: - * - * http://flink.apache.org/docs/latest/apis/batch/index.html - * - * and the examples - * - * http://flink.apache.org/docs/latest/apis/batch/examples.html - * - */ - - // execute program - env.execute("Flink Batch Scala API Skeleton") - } -} diff --git a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/StreamingJob.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/StreamingJob.scala deleted file mode 100644 index 2325a6c..0000000 --- a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/StreamingJob.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.heibaiying - -import org.apache.flink.streaming.api.scala._ - -/** - * Skeleton for a Flink Streaming Job. - * - * For a tutorial how to write a Flink streaming application, check the - * tutorials and examples on the Flink Website. - * - * To package your application into a JAR file for execution, run - * 'mvn clean package' on the command line. - * - * If you change the name of the main class (with the public static void main(String[] args)) - * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). - */ -object StreamingJob { - def main(args: Array[String]) { - // set up the streaming execution environment - val env = StreamExecutionEnvironment.getExecutionEnvironment - - /* - * Here, you can start creating your execution plan for Flink. - * - * Start with getting some data from the environment, like - * env.readTextFile(textPath); - * - * then, transform the resulting DataStream[String] using operations - * like - * .filter() - * .flatMap() - * .join() - * .group() - * - * and many more. - * Have a look at the programming guide: - * - * http://flink.apache.org/docs/latest/apis/streaming/index.html - * - */ - - // execute program - env.execute("Flink Streaming Scala API Skeleton") - } -} diff --git a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala index ff52ae7..b7a51b3 100644 --- a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala +++ b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountBatch.scala @@ -6,8 +6,12 @@ object WordCountBatch { def main(args: Array[String]): Unit = { val benv = ExecutionEnvironment.getExecutionEnvironment - val text = benv.readTextFile("D:\\BigData-Notes\\code\\Flink\\flink-basis-scala\\src\\main\\resources\\wordcount.txt") - val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1) - counts.print() + val dataSet = benv.readTextFile("D:\\BigData-Notes\\code\\Flink\\flink-basis-scala\\src\\main\\resources\\wordcount.txt") + dataSet.flatMap { _.toLowerCase.split(",")} + .filter (_.nonEmpty) + .map { (_, 1) } + .groupBy(0) + .sum(1) + .print() } } diff --git a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountStreaming.scala b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountStreaming.scala index daad9ce..6a71e5d 100644 --- a/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountStreaming.scala +++ b/code/Flink/flink-basis-scala/src/main/scala/com/heibaiying/WordCountStreaming.scala @@ -10,16 +10,14 @@ object WordCountStreaming { val senv = StreamExecutionEnvironment.getExecutionEnvironment - val text: DataStream[String] = senv.socketTextStream("192.168.200.229", 9999, '\n') - val windowCounts = text.flatMap { w => w.split(",") }.map { w => WordWithCount(w, 1) }.keyBy("word") - .timeWindow(Time.seconds(5)).sum("count") - - windowCounts.print().setParallelism(1) - + val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n') + dataStream.flatMap { line => line.toLowerCase.split(",") } + .filter(_.nonEmpty) + .map { word => (word, 1) } + .keyBy(0) + .timeWindow(Time.seconds(3)) + .sum(1) + .print() senv.execute("Streaming WordCount") - } - - case class WordWithCount(word: String, count: Long) - } diff --git a/code/Flink/flink-kafka-integration/pom.xml b/code/Flink/flink-kafka-integration/pom.xml new file mode 100644 index 0000000..1b80867 --- /dev/null +++ b/code/Flink/flink-kafka-integration/pom.xml @@ -0,0 +1,242 @@ + + + 4.0.0 + + com.heibaiying + flink-kafka-integration + 1.0 + jar + + Flink Quickstart Job + http://www.myorganization.org + + + UTF-8 + 1.9.0 + 1.8 + 2.11 + ${java.version} + ${java.version} + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + + + + + + + + + + org.slf4j + slf4j-log4j12 + 1.7.7 + runtime + + + log4j + log4j + 1.2.17 + runtime + + + org.apache.flink + flink-connector-kafka_2.11 + 1.9.0 + + + mysql + mysql-connector-java + 8.0.16 + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.heibaiying.KafkaStreamingJob + + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.0.0,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + + + + + + + add-dependencies-for-IDEA + + + + idea.version + + + + + + org.apache.flink + flink-java + ${flink.version} + compile + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + compile + + + + + + diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java new file mode 100644 index 0000000..14861bc --- /dev/null +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/CustomSinkJob.java @@ -0,0 +1,23 @@ +package com.heibaiying; + +import com.heibaiying.bean.Employee; +import com.heibaiying.sink.FlinkToMySQL; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.sql.Date; + +public class CustomSinkJob { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Date date = new Date(System.currentTimeMillis()); + DataStreamSource streamSource = env.fromElements( + new Employee("hei", 10, date), + new Employee("bai", 20, date), + new Employee("ying", 30, date)); + streamSource.addSink(new FlinkToMySQL()); + env.execute(); + } +} diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java new file mode 100644 index 0000000..d26da5f --- /dev/null +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/KafkaStreamingJob.java @@ -0,0 +1,43 @@ +package com.heibaiying; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; +import java.util.Properties; + +public class KafkaStreamingJob { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // 1.指定Kafka的相关配置属性 + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "192.168.200.229:9092"); + + // 2.接收Kafka上的数据 + DataStream stream = env + .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties)); + + // 3.定义计算结果到 Kafka ProducerRecord 的转换 + KafkaSerializationSchema kafkaSerializationSchema = new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(String element, @Nullable Long timestamp) { + return new ProducerRecord<>("flink-stream-out-topic", element.getBytes()); + } + }; + // 4. 定义Flink Kafka生产者 + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic", + kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5); + // 5. 将接收到输入元素*2后写出到Kafka + stream.map((MapFunction) value -> value + value).addSink(kafkaProducer); + env.execute("Flink Streaming"); + } +} diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/bean/Employee.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/bean/Employee.java new file mode 100644 index 0000000..24c2179 --- /dev/null +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/bean/Employee.java @@ -0,0 +1,42 @@ +package com.heibaiying.bean; + +import java.sql.Date; + +public class Employee { + + private String name; + private int age; + private Date birthday; + + Employee(){} + + public Employee(String name, int age, Date birthday) { + this.name = name; + this.age = age; + this.birthday = birthday; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public Date getBirthday() { + return birthday; + } + + public void setBirthday(Date birthday) { + this.birthday = birthday; + } +} diff --git a/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQL.java b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQL.java new file mode 100644 index 0000000..aa3e064 --- /dev/null +++ b/code/Flink/flink-kafka-integration/src/main/java/com/heibaiying/sink/FlinkToMySQL.java @@ -0,0 +1,43 @@ +package com.heibaiying.sink; + +import com.heibaiying.bean.Employee; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; + +public class FlinkToMySQL extends RichSinkFunction { + + private PreparedStatement stmt; + private Connection conn; + + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("com.mysql.cj.jdbc.Driver"); + conn = DriverManager.getConnection("jdbc:mysql://192.168.200.229:3306/employees?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", "root", "123456"); + String sql = "insert into emp(name, age, birthday) values(?, ?, ?)"; + stmt = conn.prepareStatement(sql); + } + + @Override + public void invoke(Employee value, Context context) throws Exception { + stmt.setString(1, value.getName()); + stmt.setInt(2, value.getAge()); + stmt.setDate(3, value.getBirthday()); + stmt.executeUpdate(); + } + + @Override + public void close() throws Exception { + super.close(); + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + +} diff --git a/code/Flink/flink-kafka-integration/src/main/resources/log4j.properties b/code/Flink/flink-kafka-integration/src/main/resources/log4j.properties new file mode 100644 index 0000000..da32ea0 --- /dev/null +++ b/code/Flink/flink-kafka-integration/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/code/Flink/flink-time-watermark/pom.xml b/code/Flink/flink-time-watermark/pom.xml new file mode 100644 index 0000000..2dd15f1 --- /dev/null +++ b/code/Flink/flink-time-watermark/pom.xml @@ -0,0 +1,232 @@ + + + 4.0.0 + + com.heibaiying + flink-time-watermark + 1.0 + jar + + Flink Quickstart Job + http://www.myorganization.org + + + UTF-8 + 1.9.0 + 1.8 + 2.11 + ${java.version} + ${java.version} + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + + + + + + + + + + org.slf4j + slf4j-log4j12 + 1.7.7 + runtime + + + log4j + log4j + 1.2.17 + runtime + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.heibaiying.SampleJob + + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.0.0,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + + + + + + + add-dependencies-for-IDEA + + + + idea.version + + + + + + org.apache.flink + flink-java + ${flink.version} + compile + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + compile + + + + + + diff --git a/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java new file mode 100644 index 0000000..733b05d --- /dev/null +++ b/code/Flink/flink-time-watermark/src/main/java/com/heibaiying/SampleJob.java @@ -0,0 +1,27 @@ +package com.heibaiying; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +public class SampleJob { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource streamSource = env.socketTextStream("192.168.200.229", 9999, "\n", 3); + streamSource.flatMap(new FlatMapFunction>() { + @Override + public void flatMap(String value, Collector> out) throws Exception { + String[] words = value.split("\t"); + for (String word : words) { + out.collect(new Tuple2<>(word, 1L)); + } + } + }).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); + env.execute("Flink Streaming"); + } +} diff --git a/code/Flink/flink-time-watermark/src/main/resources/log4j.properties b/code/Flink/flink-time-watermark/src/main/resources/log4j.properties new file mode 100644 index 0000000..da32ea0 --- /dev/null +++ b/code/Flink/flink-time-watermark/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/notes/Flink_Data_Sink.md b/notes/Flink_Data_Sink.md new file mode 100644 index 0000000..939ce8a --- /dev/null +++ b/notes/Flink_Data_Sink.md @@ -0,0 +1,119 @@ +# Flink Sink + +## 一、Data Sinks + +在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下: + +### 1.1 writeAsText + +`writeAsText` 用于将计算结果以文本的方式并行地写入到指定文件夹下,除了路径参数是必选外,该方法还可以通过指定第二个参数来定义输出模式,它有以下两个可选值: + ++ **WriteMode.NO_OVERWRITE**:当指定路径上不存在任何文件时,才执行写出操作; ++ **WriteMode.OVERWRITE**:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。 + +使用示例如下: + +```java + streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE); +``` + +以上写出是以并行的方式写出到多个文件,如果想要将输出结果全部写出到一个文件,需要设置其并行度为 1: + +```java +streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1); +``` + +### 1.2 writeAsCsv + +`writeAsCsv` 用于将计算结果以 CSV 的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持传入输出模式,行分隔符,和字段分隔符三个额外的参数,其方法定义如下: + +```java +writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) +``` + +### 1.3 print \ printToErr + +`print \ printToErr` 是测试当中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。 + +### 1.4 writeUsingOutputFormat + +采用自定义的输出格式将计算结果写出,上面介绍的 `writeAsText` 和 `writeAsCsv` 其底层调用的都是该方法,源码如下: + +```java +public DataStreamSink writeAsText(String path, WriteMode writeMode) { + TextOutputFormat tof = new TextOutputFormat<>(new Path(path)); + tof.setWriteMode(writeMode); + return writeUsingOutputFormat(tof); +} +``` + +### 1.5 writeToSocket + +`writeToSocket` 用于将计算结果以指定的格式写出到 Socket 中,使用示例如下: + +```shell +streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema()); +``` + +## 二、Streaming Connectors + +除了上述 API 外,Flink 中还内置了系列的 Connectors 连接器,用于将计算结果输入到常用的存储系统或者消息中间件中,具体如下: + +- Apache Kafka (支持 source 和 sink) +- Apache Cassandra (sink) +- Amazon Kinesis Streams (source/sink) +- Elasticsearch (sink) +- Hadoop FileSystem (sink) +- RabbitMQ (source/sink) +- Apache NiFi (source/sink) +- Google PubSub (source/sink) + +除了内置的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink Sink 相关的连接器如下: + +- Apache ActiveMQ (source/sink) +- Apache Flume (sink) +- Redis (sink) +- Akka (sink) + +这里接着在 Data Sources 章节介绍的整合 Kafka Source 的基础上,将 Kafka Sink 也一并进行整合,具体步骤如下。 + +## 三、整合 Kafka Sink + +### 3.1 addSink + +### 3.2 创建输出主题 + +```shell +# 创建用于测试的输出主题 +bin/kafka-topics.sh --create \ + --bootstrap-server hadoop001:9092 \ + --replication-factor 1 \ + --partitions 1 \ + --topic flink-stream-out-topic + +# 查看所有主题 + bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092 +``` + +### 3.3 启动消费者 + +```java +bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic +``` + +### 3.4 测试结果 + + + +## 四、自定义 Sink + +### 4.1 导入依赖 + +### 4.2 自定义 Sink + +### 4.3 测试结果 + + + + + diff --git a/notes/Flink_Data_Source.md b/notes/Flink_Data_Source.md new file mode 100644 index 0000000..7686cad --- /dev/null +++ b/notes/Flink_Data_Source.md @@ -0,0 +1,280 @@ +# Flink Data Source +

+ +## 一、内置 Data Source + +Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方内置提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下: + +### 1.1 基于文件构建 + +**1. readTextFile(path)**:按照 TextInputFormat 格式读取文本文件,并将其内容以字符串的形式返回。示例如下: + +```java +env.readTextFile(filePath).print(); +``` + +**2. readFile(fileInputFormat, path)** :按照指定格式读取文件。 + +**3. readFile(inputFormat, filePath, watchType, interval, typeInformation) **:按照指定格式周期性的读取文件。其中各个参数的含义如下: + ++ **inputFormat**:数据流的输入格式。 ++ **filePath**:文件路径,可以是本地文件系统上的路径,也可以是 HDFS 上的文件路径。 ++ **watchType**:读取方式,它有两个可选值,分别是 `FileProcessingMode.PROCESS_ONCE` 和 `FileProcessingMode.PROCESS_CONTINUOUSLY`:前者表示对指定路径上的数据只读取一次,然后退出;后者表示对路径进行定期扫描从而可以获取到新的数据。需要注意的是如果 watchType 被设置为 `PROCESS_CONTINUOUSLY`,那么当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 *exactly-once* 语义。 ++ **interval**:定期扫描的时间间隔。 ++ **typeInformation**:输入流中元素的类型。 + +使用示例如下: + +```java +final String filePath = "D:\\log4j.properties"; +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.readFile(new TextInputFormat(new Path(filePath)), + filePath, + FileProcessingMode.PROCESS_ONCE, + 1, + BasicTypeInfo.STRING_TYPE_INFO).print(); +env.execute(); +``` + +### 1.2 基于集合构建 + +**1. fromCollection(Collection)**:基于集合构建,集合中的所有元素必须是同一类型。示例如下: + +```java +env.fromCollection(Arrays.asList(1,2,3,4,5)).print(); +``` + +**2. fromElements(T ...)**: 基于元素构建,所有元素必须是同一类型。示例如下: + +```java +env.fromElements(1,2,3,4,5).print(); +``` +**3. generateSequence(from, to)**:基于给定的序列区间进行构建。示例如下: + +```java +env.generateSequence(0,100); +``` + +**4. fromCollection(Iterator, Class)**:基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。使用示例如下: + +```java +env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print(); +``` + +其中 CustomIterator 为自定义的迭代器,这里以产生 1 到 100 区间内的数据为例,源码如下。需要注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serializable ,否则会抛出序列化失败的异常: + +```java +import java.io.Serializable; +import java.util.Iterator; + +public class CustomIterator implements Iterator, Serializable { + private Integer i = 0; + + @Override + public boolean hasNext() { + return i < 100; + } + + @Override + public Integer next() { + i++; + return i; + } +} +``` + +**5. fromParallelCollection(SplittableIterator, Class)**:方法接收两个参数,第二个参数用于定义输出元素的类型,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中。 + +### 1.3 基于 Socket 构建 + +Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流,socketTextStream 方法有以下四个主要参数: + +- **hostname**:主机名; +- **port**:端口号,设置为 0 时,表示端口号自动分配; +- **delimiter**:用于分隔每条记录的分隔符; +- **maxRetry**:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。使用示例如下: + +```shell + env.socketTextStream("192.168.0.229", 9999, "\n", 3).print(); +``` + + + +## 二、自定义 Data Source + +### 2.1 SourceFunction + +除了内置的数据源外,用户还可以使用 `addSource` 方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口,这里以产生 [0 , 1000) 区间内的数据为例,代码如下: + +```java +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +env.addSource(new SourceFunction() { + + private long count = 0L; + private volatile boolean isRunning = true; + + public void run(SourceContext ctx) { + while (isRunning && count < 1000) { + // 通过collect将输入发送出去 + ctx.collect(count); + count++; + } + } + + public void cancel() { + isRunning = false; + } + +}).print(); +env.execute(); +``` + +### 2.2 ParallelSourceFunction 和 RichParallelSourceFunction + +上面通过 SourceFunction 实现的数据源是不具有并行度的,即不支持在得到的 DataStream 上调用 `setParallelism(n)` 方法,此时会抛出如下的异常: + +```shell +Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source +``` + +如果你想要实现具有并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图: + +
+ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有并行度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,所以其除了具有并行度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,closen() 。 + +## 三、Streaming Connectors + +### 3.1 内置连接器 + +除了自定义数据源外, Flink 还内置了多种连接器,用于满足大多数的数据收集场景。当前内置连接器的支持情况如下: + +- Apache Kafka (支持 source 和 sink) +- Apache Cassandra (sink) +- Amazon Kinesis Streams (source/sink) +- Elasticsearch (sink) +- Hadoop FileSystem (sink) +- RabbitMQ (source/sink) +- Apache NiFi (source/sink) +- Twitter Streaming API (source) +- Google PubSub (source/sink) + +除了上述的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink 相关的连接器如下: + +- Apache ActiveMQ (source/sink) +- Apache Flume (sink) +- Redis (sink) +- Akka (sink) +- Netty (source) + +随着 Flink 的不断发展,可以预见到其会支持越来越多类型的连接器,关于连接器的后续发展情况,可以查看其官方文档:[Streaming Connectors]( https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html) 。在所有 DataSource 连接器中,使用的广泛的就是 Kafka,所以这里我们以其为例,来介绍 Connectors 的整合步骤。 + +### 3.2 整合 Kakfa + +#### 1. 导入依赖 + +整合 Kafka 时,一定要注意所使用的 Kafka 的版本,不同版本间所需的 Maven 依赖和开发时所调用的类均不相同,具体如下: + +| Maven 依赖 | Flink 版本 | Consumer and Producer 类的名称 | Kafka 版本 | +| :------------------------------ | :--------- | :----------------------------------------------- | :--------- | +| flink-connector-kafka-0.8_2.11 | 1.0.0 + | FlinkKafkaConsumer08
FlinkKafkaProducer08 | 0.8.x | +| flink-connector-kafka-0.9_2.11 | 1.0.0 + | FlinkKafkaConsumer09
FlinkKafkaProducer09 | 0.9.x | +| flink-connector-kafka-0.10_2.11 | 1.2.0 + | FlinkKafkaConsumer010
FlinkKafkaProducer010 | 0.10.x | +| flink-connector-kafka-0.11_2.11 | 1.4.0 + | FlinkKafkaConsumer011
FlinkKafkaProducer011 | 0.11.x | +| flink-connector-kafka_2.11 | 1.7.0 + | FlinkKafkaConsumer
FlinkKafkaProducer | >= 1.0.0 | + +这里我使用的 Kafka 版本为 kafka_2.12-2.2.0,添加的依赖如下: + +```xml + + org.apache.flink + flink-connector-kafka_2.11 + 1.9.0 + +``` + +#### 2. 代码开发 + +这里以最简单的场景为例,接收 Kafka 上的数据并打印,代码如下: + +```java +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Properties properties = new Properties(); +// 指定Kafka的连接位置 +properties.setProperty("bootstrap.servers", "hadoop001:9092"); +// 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则 +DataStream stream = env + .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties)); +stream.print(); +env.execute("Flink Streaming"); +``` + +### 3.3 整合测试 + +#### 1. 启动 Kakfa + +Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的: + +```shell +# zookeeper启动命令 +bin/zkServer.sh start + +# 内置zookeeper启动命令 +bin/zookeeper-server-start.sh config/zookeeper.properties +``` + +启动单节点 kafka 用于测试: + +```shell +# bin/kafka-server-start.sh config/server.properties +``` + +#### 2. 创建 Topic + +```shell +# 创建用于测试主题 +bin/kafka-topics.sh --create \ + --bootstrap-server hadoop001:9092 \ + --replication-factor 1 \ + --partitions 1 \ + --topic flink-stream-in-topic + +# 查看所有主题 + bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092 +``` + +#### 3. 启动 Producer + +这里 启动一个 Kafka 生产者,用于发送测试数据: + +```shell +bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic +``` + +#### 4. 测试结果 + +在 Producer 上输入任意测试数据,之后观察程序控制台的输出: + +
+程序控制台的输出如下: + +
+可以看到已经成功接收并打印出相关的数据。 + + + +## 参考资料 + +1. data-sources:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sources +2. Streaming Connectors:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html +3. Apache Kafka Connector: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html \ No newline at end of file diff --git a/notes/Flink_Time_Watermark.md b/notes/Flink_Time_Watermark.md new file mode 100644 index 0000000..e69de29 diff --git a/notes/Flink_Windows.md b/notes/Flink_Windows.md new file mode 100644 index 0000000..40deba1 --- /dev/null +++ b/notes/Flink_Windows.md @@ -0,0 +1,110 @@ +# Flink Windows + +## 一、窗口概念 + +在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内所有商品的点击量;或者每发生1000次点击后,都去统计一下每个商品点击率的占比。在 Flink 中,我们使用窗口 (Window) 来实现这类功能。按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。 + +## 二、Time Windows + +Time Windows 用于以时间为维度来进行数据聚合,具体分为以下四类: + +### 2.1 Tumbling Windows + +滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔1小时统计过去1小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下: + +![flink-tumbling-windows](D:\BigData-Notes\pictures\flink-tumbling-windows.png) + +这里我们以词频统计为例,给出一个具体的用例,代码如下: + +```java +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// 接收socket上的数据输入 +DataStreamSource streamSource = env.socketTextStream("hadoop001", 9999, "\n", 3); +streamSource.flatMap(new FlatMapFunction>() { + @Override + public void flatMap(String value, Collector> out) throws Exception { + String[] words = value.split("\t"); + for (String word : words) { + out.collect(new Tuple2<>(word, 1L)); + } + } +}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒统计一次每个单词出现的数量 +env.execute("Flink Streaming"); +``` + +测试结果如下: + +![flink-window-word-count](D:\BigData-Notes\pictures\flink-window-word-count.png) + +### 2.2 Sliding Windows + +滑动窗口用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天可以分为 240 个窗口。图示如下: + +![flink-sliding-windows](D:\BigData-Notes\pictures\flink-sliding-windows.png) + +可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下: + +```java +// 每隔3秒统计一次过去1分钟内的数据 +timeWindow(Time.minutes(1),Time.seconds(3)) +``` + +### 2.3 Session Windows + +当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过 Session Windows 来进行实现。 + +![flink-session-windows](D:\BigData-Notes\pictures\flink-session-windows.png) + +具体的实现代码如下: + +```java +// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经端口,此时触发统计 +window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) +// 以事件时间为衡量标准 +window(EventTimeSessionWindows.withGap(Time.seconds(10))) +``` + +### 2.4 Global Windows + +最后一个窗口是全局窗口, 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。 + +![flink-non-windowed](D:\BigData-Notes\pictures\flink-non-windowed.png) + +这里继续以上面词频统计的案例为例,示例代码如下: + +```java +// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数 +window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print(); +``` + +## 三、Count Windows + +Count Windows 用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下: + +```java +// 滚动计数窗口,每1000次点击则计算一次 +countWindow(1000) +// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况 +countWindow(1000,10) +``` + +实际上计数窗口内部就是调用的我们上一部分介绍的全局窗口来实现的,其源码如下: + +```java +public WindowedStream countWindow(long size) { + return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size))); +} + + +public WindowedStream countWindow(long size, long slide) { + return window(GlobalWindows.create()) + .evictor(CountEvictor.of(size)) + .trigger(CountTrigger.of(slide)); +} +``` + + + +## 参考资料 + +Flink Windows: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html \ No newline at end of file diff --git a/notes/Flink开发环境搭建.md b/notes/Flink开发环境搭建.md index 81a2769..eddffc8 100644 --- a/notes/Flink开发环境搭建.md +++ b/notes/Flink开发环境搭建.md @@ -17,7 +17,6 @@ Flink 分别提供了基于 Java 语言和 Scala 语言的 API ,如果想要使用 Scala 语言来开发 Flink 程序,可以通过在 IDEA 中安装 Scala 插件来提供语法提示,代码高亮等功能。打开 IDEA , 依次点击 `File => settings => plugins` 打开插件安装页面,搜索 Scala 插件并进行安装,安装完成后,重启 IDEA 即可生效。
- ## 二、Flink 项目初始化 ### 2.1 使用官方脚本构建 @@ -68,11 +67,9 @@ mvn archetype:generate \ 如果你使用的是开发工具是 IDEA ,可以直接在项目创建页面选择 Maven Flink Archetype 进行项目初始化:
- 如果你的 IDEA 没有上述 Archetype, 可以通过点击右上角的 `ADD ARCHETYPE` ,来进行添加,依次填入所需信息,这些信息都可以从上述的 `archetype:generate ` 语句中获取。点击 `OK` 保存后,该 Archetype 就会一直存在于你的 IDEA 中,之后每次创建项目时,只需要直接选择该 Archetype 即可:
- 选中 Flink Archetype ,然后点击 `NEXT` 按钮,之后的所有步骤都和正常的 Maven 工程相同。 ## 三、项目结构 @@ -82,7 +79,6 @@ mvn archetype:generate \ 创建完成后的自动生成的项目结构如下:
- 其中 BatchJob 为批处理的样例代码,源码如下: ```scala @@ -146,7 +142,6 @@ object StreamingJob { 需要特别注意的以上依赖的 `scope` 标签全部被标识为 provided ,这意味着这些依赖都不会被打入最终的 JAR 包。因为 Flink 的安装包中已经提供了这些依赖,位于其 lib 目录下,名为 `flink-dist_*.jar` ,它包含了 Flink 的所有核心类和依赖:
- `scope` 标签被标识为 provided 会导致你在 IDEA 中启动项目时会抛出 ClassNotFoundException 异常。基于这个原因,在使用 IDEA 创建项目时还自动生成了以下 profile 配置: ```xml @@ -190,7 +185,6 @@ object StreamingJob { 在 id 为 `add-dependencies-for-IDEA` 的 profile 中,所有的核心依赖都被标识为 compile,此时你可以无需改动任何代码,只需要在 IDEA 的 Maven 面板中勾选该 profile,即可直接在 IDEA 中运行 Flink 项目:
- ## 四、词频统计案例 项目创建完成后,可以先书写一个简单的词频统计的案例来尝试运行 Flink 项目,以下以 Scala 语言为例,分别介绍流处理程序和批处理程序的编程示例: @@ -204,9 +198,13 @@ object WordCountBatch { def main(args: Array[String]): Unit = { val benv = ExecutionEnvironment.getExecutionEnvironment - val text = benv.readTextFile("D:\\wordcount.txt") - val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1) - counts.print() + val dataSet = benv.readTextFile("D:\\wordcount.txt") + dataSet.flatMap { _.toLowerCase.split(",")} + .filter (_.nonEmpty) + .map { (_, 1) } + .groupBy(0) + .sum(1) + .print() } } ``` @@ -223,7 +221,6 @@ d,d 本机不需要配置其他任何的 Flink 环境,直接运行 Main 方法即可,结果如下:
- ### 4.2 流处理示例 ```scala @@ -236,20 +233,17 @@ object WordCountStreaming { val senv = StreamExecutionEnvironment.getExecutionEnvironment - val text: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n') - val windowCounts = text.flatMap { w => w.split(",") }.map { w => WordWithCount(w, 1) }.keyBy("word") - .timeWindow(Time.seconds(5)).sum("count") - - windowCounts.print().setParallelism(1) - + val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n') + dataStream.flatMap { line => line.toLowerCase.split(",") } + .filter(_.nonEmpty) + .map { word => (word, 1) } + .keyBy(0) + .timeWindow(Time.seconds(3)) + .sum(1) + .print() senv.execute("Streaming WordCount") - } - - case class WordWithCount(word: String, count: Long) - } - ``` 这里以监听指定端口号上的内容为例,使用以下命令来开启端口服务: @@ -271,7 +265,6 @@ https://flink.apache.org/downloads.html Flink 大多数版本都提供有 Scala 2.11 和 Scala 2.12 两个版本的安装包可供下载:
- 下载完成后进行解压即可,Scala Shell 位于安装目录的 bin 目录下,直接使用以下命令即可以本地模式启动: ```shell @@ -281,7 +274,6 @@ Flink 大多数版本都提供有 Scala 2.11 和 Scala 2.12 两个版本的安 命令行启动完成后,其已经提供了批处理 (benv 和 btenv)和流处理(senv 和 stenv)的运行环境,可以直接运行 Scala Flink 程序,示例如下:
- 最后说明一个常见的异常:这里我使用的 Flink 版本为 1.9.1,启动时会抛出如下异常。这里因为按照官方的说明,目前所有 Scala 2.12 版本的安装包暂时都不支持 Scala Shell,所以如果想要使用 Scala Shell,只能选择 Scala 2.11 版本的安装包。 ```shell diff --git a/notes/资料分享与工具推荐.md b/notes/资料分享与工具推荐.md index 0bf087a..b7e3b33 100644 --- a/notes/资料分享与工具推荐.md +++ b/notes/资料分享与工具推荐.md @@ -26,6 +26,7 @@ - 有态度的 HBase/Spark/BigData:http://hbasefly.com/ - 深入 Apache Spark 的设计和实现原理 : https://github.com/JerryLead/SparkInternals +- Jark's Blog - Flink 系列文章:http://wuchong.me/categories/Flink/ diff --git a/pictures/flink-RichParallelSourceFunction.png b/pictures/flink-RichParallelSourceFunction.png new file mode 100644 index 0000000..64c0f9e Binary files /dev/null and b/pictures/flink-RichParallelSourceFunction.png differ diff --git a/pictures/flink-kafka-datasource-console.png b/pictures/flink-kafka-datasource-console.png new file mode 100644 index 0000000..91620dc Binary files /dev/null and b/pictures/flink-kafka-datasource-console.png differ diff --git a/pictures/flink-kafka-datasource-producer.png b/pictures/flink-kafka-datasource-producer.png new file mode 100644 index 0000000..1d24c38 Binary files /dev/null and b/pictures/flink-kafka-datasource-producer.png differ diff --git a/pictures/flink-kafka-producer-consumer.png b/pictures/flink-kafka-producer-consumer.png new file mode 100644 index 0000000..98f17ce Binary files /dev/null and b/pictures/flink-kafka-producer-consumer.png differ diff --git a/pictures/flink-mysql-sink.png b/pictures/flink-mysql-sink.png new file mode 100644 index 0000000..7f03dc6 Binary files /dev/null and b/pictures/flink-mysql-sink.png differ diff --git a/pictures/flink-non-windowed.png b/pictures/flink-non-windowed.png new file mode 100644 index 0000000..6ce05be Binary files /dev/null and b/pictures/flink-non-windowed.png differ diff --git a/pictures/flink-session-windows.png b/pictures/flink-session-windows.png new file mode 100644 index 0000000..08f432f Binary files /dev/null and b/pictures/flink-session-windows.png differ diff --git a/pictures/flink-sliding-windows.png b/pictures/flink-sliding-windows.png new file mode 100644 index 0000000..1c92a37 Binary files /dev/null and b/pictures/flink-sliding-windows.png differ diff --git a/pictures/flink-tumbling-windows.png b/pictures/flink-tumbling-windows.png new file mode 100644 index 0000000..94d6f72 Binary files /dev/null and b/pictures/flink-tumbling-windows.png differ diff --git a/pictures/flink-window-word-count.png b/pictures/flink-window-word-count.png new file mode 100644 index 0000000..158f255 Binary files /dev/null and b/pictures/flink-window-word-count.png differ