streaming 整合 kafka

This commit is contained in:
luoxiang 2019-06-04 22:18:03 +08:00
parent 03f9607720
commit dbc183e69f

View File

@ -17,15 +17,15 @@
Spark针对Kafka的不同版本提供了两套整合方案`spark-streaming-kafka-0-8``spark-streaming-kafka-0-10`,其主要区别如下:
| | [spark-streaming-kafka-0-8](https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html) | [spark-streaming-kafka-0-10](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) |
| :--------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- |
| Kafka版本 | 0.8.2.1 or higher | 0.10.0 or higher |
| AP状态 | Deprecated<br/>从Spark 2.3.0版本开始Kafka 0.8支持已被弃用 | Stable |
| 语言支持 | Scala, Java, Python | Scala, Java |
| Receiver DStream | Yes | No |
| Direct DStream | Yes | Yes |
| SSL / TLS Support | No | Yes |
| Offset Commit API(偏移量提交) | No | Yes |
| | [spark-streaming-kafka-0-8](https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html) | [spark-streaming-kafka-0-10](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) |
| :-------------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- |
| Kafka版本 | 0.8.2.1 or higher | 0.10.0 or higher |
| AP状态 | Deprecated<br/>从Spark 2.3.0版本开始Kafka 0.8支持已被弃用 | Stable(稳定版) |
| 语言支持 | Scala, Java, Python | Scala, Java |
| Receiver DStream | Yes | No |
| Direct DStream | Yes | Yes |
| SSL / TLS Support | No | Yes |
| Offset Commit API(偏移量提交) | No | Yes |
| Dynamic Topic Subscription<br/>(动态主题订阅) | No | Yes |
本文使用的Kafka版本为`kafka_2.12-2.2.0`,故采用第二种方式进行整合。
@ -35,6 +35,10 @@ Spark针对Kafka的不同版本提供了两套整合方案`spark-streaming
项目采用Maven进行构建主要依赖如下
```xml
<properties>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<!-- Spark Streaming-->
<dependency>
@ -48,17 +52,10 @@ Spark针对Kafka的不同版本提供了两套整合方案`spark-streaming
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>2.4.3</version>
</dependency>
<!--本地测试时如果出现异常:Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 10582
则是因为paranamer版本问题可以添加下面的依赖包进行解决-->
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
</dependencies>
```
> 完整源码见本仓库https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-kafka
> 完整源码见本仓库:[spark-streaming-kafka](https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-kafka)
## 三、整合Kafka
@ -195,7 +192,7 @@ broker返回给消费者数据的等待时间。
#### 8. max.poll.records
单次调用poll()方法能够返回的记录数量。
单次调用`poll()`方法能够返回的记录数量。
#### 9. receive.buffer.bytes 和 send.buffer.byte
@ -207,9 +204,9 @@ broker返回给消费者数据的等待时间。
Spark Streaming中提供了如下三种位置策略用于指定Kafka主题分区与Spark执行程序Executors之间的分配关系
+ **PreferConsistent** : 通用方式,它将在所有的Executors上均匀分配分区
+ **PreferConsistent** : 它将在所有的Executors上均匀分配分区
+ **PreferBrokers** : 当Spark的Executors与Kafka brokers在同一机器上时可以选择该选项
+ **PreferBrokers** : 当Spark的Executor与Kafka Broker在同一机器上时可以选择该选项它优先将该Broker上的首领分区分配给该机器上的Executor
+ **PreferFixed** : 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机,其构造器如下:
```scala
@ -254,12 +251,12 @@ def SubscribePattern[K, V](
### 3.5 提交偏移量
在示例代码中,我们将`enable.auto.commit`设置为true代表自动提交。在某些情况下你可能需要更高的可靠性如在业务完全处理完成后再提交偏移量这时候可以使用手动提交。目前Spark Streaming没有提供关于手动提交偏移量的方法想要进行手动提交只能调用Kafka原生的API :
在示例代码中,我们将`enable.auto.commit`设置为true代表自动提交。在某些情况下你可能需要更高的可靠性如在业务完全处理完成后再提交偏移量这时候可以使用手动提交。想要进行手动提交,需要调用Kafka原生的API :
+ `commitSync`: 用于异步提交;
+ `commitAsync`:用于同步提交。
关于编程方面就介绍到这里,下面介绍如果测试整合结果。
具体提交方式可以参见:[Kafka消费者详解]([https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka消费者详解.md)
@ -311,7 +308,7 @@ bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streami
这里我直接使用本地模式启动Spark Streaming程序。启动后使用生产者发送数据从控制台查看结果。
从控制台输出中可以看到数据流已经被成功接收,由于采用`kafka-console-producer.sh`发送的数据默认是没有key的所以key值为null。同时从输出中也可以看到在程序中指定的groupId和程序自动分配的clientId。
从控制台输出中可以看到数据流已经被成功接收,由于采用`kafka-console-producer.sh`发送的数据默认是没有key的所以key值为null。同时从输出中也可以看到在程序中指定的`groupId`和程序自动分配的`clientId`
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-straming-kafka-console.png"/> </div>