spark-streaming-kafka

This commit is contained in:
luoxiang 2019-05-26 16:38:00 +08:00
parent 36f957e78e
commit f5d045bfba
2 changed files with 97 additions and 0 deletions

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heibaiying</groupId>
<artifactId>spark-streaming-kafka</artifactId>
<version>1.0</version>
<properties>
<scala.version>2.12</scala.version>
<spark.version>2.4.0</spark.version>
</properties>
<dependencies>
<!-- Spark Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming整合Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>2.4.3</version>
</dependency>
<!--如果出现异常:Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 10582
则是因为paranamer版本问题添加下面的依赖包-->
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,58 @@
package com.heibaiying.kafka
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark streaming 整合 kafka
*/
object KafkaDirectStream {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, Object](
/*
* 指定broker的地址清单清单里不需要包含所有的broker地址生产者会从给定的broker里查找其他broker的信息
* 不过建议至少提供两个broker的信息作为容错
*/
"bootstrap.servers" -> "hadoop001:9092",
/*键的序列化器*/
"key.deserializer" -> classOf[StringDeserializer],
/*值的序列化器*/
"value.deserializer" -> classOf[StringDeserializer],
/*消费者所在分组的ID*/
"group.id" -> "spark-streaming-group",
/*
* 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
* latest: 在偏移量无效的情况下消费者将从最新的记录开始读取数据在消费者启动之后生成的记录
* earliest: 在偏移量无效的情况下消费者将从起始位置读取分区的记录
*/
"auto.offset.reset" -> "latest",
/*是否自动提交*/
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("spark-streaming-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
/*打印输入流*/
stream.map(record => (record.key, record.value)).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}