From f5d045bfba51ead3702154c29f76f503d12992eb Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Sun, 26 May 2019 16:38:00 +0800 Subject: [PATCH] spark-streaming-kafka --- code/spark/spark-streaming-kafka/pom.xml | 39 +++++++++++++ .../heibaiying/kafka/KafkaDirectStream.scala | 58 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 code/spark/spark-streaming-kafka/pom.xml create mode 100644 code/spark/spark-streaming-kafka/src/main/scala/com/heibaiying/kafka/KafkaDirectStream.scala diff --git a/code/spark/spark-streaming-kafka/pom.xml b/code/spark/spark-streaming-kafka/pom.xml new file mode 100644 index 0000000..2e860ee --- /dev/null +++ b/code/spark/spark-streaming-kafka/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + com.heibaiying + spark-streaming-kafka + 1.0 + + + 2.12 + 2.4.0 + + + + + + org.apache.spark + spark-streaming_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-streaming-kafka-0-10_${scala.version} + 2.4.3 + + + + com.thoughtworks.paranamer + paranamer + 2.8 + + + + + \ No newline at end of file diff --git a/code/spark/spark-streaming-kafka/src/main/scala/com/heibaiying/kafka/KafkaDirectStream.scala b/code/spark/spark-streaming-kafka/src/main/scala/com/heibaiying/kafka/KafkaDirectStream.scala new file mode 100644 index 0000000..199a944 --- /dev/null +++ b/code/spark/spark-streaming-kafka/src/main/scala/com/heibaiying/kafka/KafkaDirectStream.scala @@ -0,0 +1,58 @@ +package com.heibaiying.kafka + +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.SparkConf +import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe +import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent +import org.apache.spark.streaming.kafka010._ +import org.apache.spark.streaming.{Seconds, StreamingContext} + + +/** + * spark streaming 整合 kafka + */ +object KafkaDirectStream { + + + def main(args: Array[String]): Unit = { + + val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]") + val streamingContext = new StreamingContext(sparkConf, Seconds(5)) + + val kafkaParams = Map[String, Object]( + /* + * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。 + * 不过建议至少提供两个broker的信息作为容错。 + */ + "bootstrap.servers" -> "hadoop001:9092", + /*键的序列化器*/ + "key.deserializer" -> classOf[StringDeserializer], + /*值的序列化器*/ + "value.deserializer" -> classOf[StringDeserializer], + /*消费者所在分组的ID*/ + "group.id" -> "spark-streaming-group", + /* + * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: + * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) + * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录 + */ + "auto.offset.reset" -> "latest", + /*是否自动提交*/ + "enable.auto.commit" -> (true: java.lang.Boolean) + ) + + val topics = Array("spark-streaming-topic") + val stream = KafkaUtils.createDirectStream[String, String]( + streamingContext, + PreferConsistent, + Subscribe[String, String](topics, kafkaParams) + ) + + /*打印输入流*/ + stream.map(record => (record.key, record.value)).print() + + streamingContext.start() + streamingContext.awaitTermination() + } + +}