flink datasource
This commit is contained in:
		| @@ -0,0 +1,23 @@ | ||||
| package com.heibaiying; | ||||
|  | ||||
| import com.heibaiying.bean.Employee; | ||||
| import com.heibaiying.sink.FlinkToMySQL; | ||||
| import org.apache.flink.streaming.api.datastream.DataStreamSource; | ||||
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||||
|  | ||||
| import java.sql.Date; | ||||
|  | ||||
| public class CustomSinkJob { | ||||
|  | ||||
|     public static void main(String[] args) throws Exception { | ||||
|  | ||||
|         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||||
|         Date date = new Date(System.currentTimeMillis()); | ||||
|         DataStreamSource<Employee> streamSource = env.fromElements( | ||||
|                 new Employee("hei", 10, date), | ||||
|                 new Employee("bai", 20, date), | ||||
|                 new Employee("ying", 30, date)); | ||||
|         streamSource.addSink(new FlinkToMySQL()); | ||||
|         env.execute(); | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,43 @@ | ||||
| package com.heibaiying; | ||||
|  | ||||
| import org.apache.flink.api.common.functions.MapFunction; | ||||
| import org.apache.flink.api.common.serialization.SimpleStringSchema; | ||||
| import org.apache.flink.streaming.api.datastream.DataStream; | ||||
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||||
| import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; | ||||
| import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; | ||||
| import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; | ||||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||||
|  | ||||
| import javax.annotation.Nullable; | ||||
| import java.util.Properties; | ||||
|  | ||||
| public class KafkaStreamingJob { | ||||
|  | ||||
|     public static void main(String[] args) throws Exception { | ||||
|  | ||||
|         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||||
|  | ||||
|         // 1.指定Kafka的相关配置属性 | ||||
|         Properties properties = new Properties(); | ||||
|         properties.setProperty("bootstrap.servers", "192.168.200.229:9092"); | ||||
|  | ||||
|         // 2.接收Kafka上的数据 | ||||
|         DataStream<String> stream = env | ||||
|                 .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties)); | ||||
|  | ||||
|         // 3.定义计算结果到 Kafka ProducerRecord 的转换 | ||||
|         KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() { | ||||
|             @Override | ||||
|             public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) { | ||||
|                 return new ProducerRecord<>("flink-stream-out-topic", element.getBytes()); | ||||
|             } | ||||
|         }; | ||||
|         // 4. 定义Flink Kafka生产者 | ||||
|         FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic", | ||||
|                 kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5); | ||||
|         // 5. 将接收到输入元素*2后写出到Kafka | ||||
|         stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer); | ||||
|         env.execute("Flink Streaming"); | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,42 @@ | ||||
| package com.heibaiying.bean; | ||||
|  | ||||
| import java.sql.Date; | ||||
|  | ||||
| public class Employee { | ||||
|  | ||||
|     private String name; | ||||
|     private int age; | ||||
|     private Date birthday; | ||||
|  | ||||
|     Employee(){} | ||||
|  | ||||
|     public Employee(String name, int age, Date birthday) { | ||||
|         this.name = name; | ||||
|         this.age = age; | ||||
|         this.birthday = birthday; | ||||
|     } | ||||
|  | ||||
|     public String getName() { | ||||
|         return name; | ||||
|     } | ||||
|  | ||||
|     public void setName(String name) { | ||||
|         this.name = name; | ||||
|     } | ||||
|  | ||||
|     public int getAge() { | ||||
|         return age; | ||||
|     } | ||||
|  | ||||
|     public void setAge(int age) { | ||||
|         this.age = age; | ||||
|     } | ||||
|  | ||||
|     public Date getBirthday() { | ||||
|         return birthday; | ||||
|     } | ||||
|  | ||||
|     public void setBirthday(Date birthday) { | ||||
|         this.birthday = birthday; | ||||
|     } | ||||
| } | ||||
| @@ -0,0 +1,43 @@ | ||||
| package com.heibaiying.sink; | ||||
|  | ||||
| import com.heibaiying.bean.Employee; | ||||
| import org.apache.flink.configuration.Configuration; | ||||
| import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; | ||||
|  | ||||
| import java.sql.Connection; | ||||
| import java.sql.DriverManager; | ||||
| import java.sql.PreparedStatement; | ||||
|  | ||||
| public class FlinkToMySQL extends RichSinkFunction<Employee> { | ||||
|  | ||||
|     private PreparedStatement stmt; | ||||
|     private Connection conn; | ||||
|  | ||||
|     @Override | ||||
|     public void open(Configuration parameters) throws Exception { | ||||
|         Class.forName("com.mysql.cj.jdbc.Driver"); | ||||
|         conn = DriverManager.getConnection("jdbc:mysql://192.168.200.229:3306/employees?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", "root", "123456"); | ||||
|         String sql = "insert into emp(name, age, birthday) values(?, ?, ?)"; | ||||
|         stmt = conn.prepareStatement(sql); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void invoke(Employee value, Context context) throws Exception { | ||||
|         stmt.setString(1, value.getName()); | ||||
|         stmt.setInt(2, value.getAge()); | ||||
|         stmt.setDate(3, value.getBirthday()); | ||||
|         stmt.executeUpdate(); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void close() throws Exception { | ||||
|         super.close(); | ||||
|         if (stmt != null) { | ||||
|             stmt.close(); | ||||
|         } | ||||
|         if (conn != null) { | ||||
|             conn.close(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| } | ||||
| @@ -0,0 +1,23 @@ | ||||
| ################################################################################ | ||||
| #  Licensed to the Apache Software Foundation (ASF) under one | ||||
| #  or more contributor license agreements.  See the NOTICE file | ||||
| #  distributed with this work for additional information | ||||
| #  regarding copyright ownership.  The ASF licenses this file | ||||
| #  to you under the Apache License, Version 2.0 (the | ||||
| #  "License"); you may not use this file except in compliance | ||||
| #  with the License.  You may obtain a copy of the License at | ||||
| # | ||||
| #      http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #  Unless required by applicable law or agreed to in writing, software | ||||
| #  distributed under the License is distributed on an "AS IS" BASIS, | ||||
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| #  See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| ################################################################################ | ||||
|  | ||||
| log4j.rootLogger=INFO, console | ||||
|  | ||||
| log4j.appender.console=org.apache.log4j.ConsoleAppender | ||||
| log4j.appender.console.layout=org.apache.log4j.PatternLayout | ||||
| log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n | ||||
		Reference in New Issue
	
	Block a user