diff --git a/README.md b/README.md
index 03dc266..5b2743b 100644
--- a/README.md
+++ b/README.md
@@ -92,7 +92,7 @@
1. [Spark Streaming简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming与流处理.md)
2. DStream常用操作详解
3. [Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md)
-4. Spark Streaming 整合 Kafka
+4. [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md)
## 四、Flink
diff --git a/notes/Spark_Streaming整合Kafka.md b/notes/Spark_Streaming整合Kafka.md
new file mode 100644
index 0000000..975a8d9
--- /dev/null
+++ b/notes/Spark_Streaming整合Kafka.md
@@ -0,0 +1,323 @@
+# Spark Streaming 整合 Kafka
+
+
+
+
+## 一、版本说明
+
+Spark针对Kafka的不同版本,提供了两套整合方案:`spark-streaming-kafka-0-8`和`spark-streaming-kafka-0-10`,其主要区别如下:
+
+| | [spark-streaming-kafka-0-8](https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html) | [spark-streaming-kafka-0-10](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) |
+| :--------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- |
+| Kafka版本 | 0.8.2.1 or higher | 0.10.0 or higher |
+| AP状态 | Deprecated
从Spark 2.3.0版本开始,Kafka 0.8支持已被弃用 | Stable |
+| 语言支持 | Scala, Java, Python | Scala, Java |
+| Receiver DStream | Yes | No |
+| Direct DStream | Yes | Yes |
+| SSL / TLS Support | No | Yes |
+| Offset Commit API(偏移量提交) | No | Yes |
+| Dynamic Topic Subscription(动态主题订阅) | No | Yes |
+
+本文使用的Kafka版本为`kafka_2.12-2.2.0`,故采用第二种方式进行整合。
+
+## 二、项目依赖
+
+项目采用Maven进行构建,主要依赖如下:
+
+```xml
+
+
+
+ org.apache.spark
+ spark-streaming_${scala.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-streaming-kafka-0-10_${scala.version}
+ 2.4.3
+
+
+
+ com.thoughtworks.paranamer
+ paranamer
+ 2.8
+
+
+```
+
+> 完整源码见本仓库:https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-kafka
+
+## 三、整合Kafka
+
+通过调用`KafkaUtils`对象的`createDirectStream`方法来创建输入流,完整代码如下:
+
+```scala
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+import org.apache.spark.streaming.kafka010._
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+/**
+ * spark streaming 整合 kafka
+ */
+object KafkaDirectStream {
+
+ def main(args: Array[String]): Unit = {
+
+ val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
+ val streamingContext = new StreamingContext(sparkConf, Seconds(5))
+
+ val kafkaParams = Map[String, Object](
+ /*
+ * 指定broker的地址清单,清单里不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息。
+ * 不过建议至少提供两个broker的信息作为容错。
+ */
+ "bootstrap.servers" -> "hadoop001:9092",
+ /*键的序列化器*/
+ "key.deserializer" -> classOf[StringDeserializer],
+ /*值的序列化器*/
+ "value.deserializer" -> classOf[StringDeserializer],
+ /*消费者所在分组的ID*/
+ "group.id" -> "spark-streaming-group",
+ /*
+ * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+ * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+ * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+ */
+ "auto.offset.reset" -> "latest",
+ /*是否自动提交*/
+ "enable.auto.commit" -> (true: java.lang.Boolean)
+ )
+
+ /*可以同时订阅多个主题*/
+ val topics = Array("spark-streaming-topic")
+ val stream = KafkaUtils.createDirectStream[String, String](
+ streamingContext,
+ /*位置策略*/
+ PreferConsistent,
+ /*订阅主题*/
+ Subscribe[String, String](topics, kafkaParams)
+ )
+
+ /*打印输入流*/
+ stream.map(record => (record.key, record.value)).print()
+
+ streamingContext.start()
+ streamingContext.awaitTermination()
+ }
+}
+```
+
+### 3.1 ConsumerRecord
+
+这里获得的输入流中每一个Record实际上是`ConsumerRecord `的实例,其包含了Record的所有可用信息,源码如下:
+
+```scala
+public class ConsumerRecord {
+
+ public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
+ public static final int NULL_SIZE = -1;
+ public static final int NULL_CHECKSUM = -1;
+
+ /*主题名称*/
+ private final String topic;
+ /*分区编号*/
+ private final int partition;
+ /*偏移量*/
+ private final long offset;
+ /*时间戳*/
+ private final long timestamp;
+ /*时间戳代表的含义*/
+ private final TimestampType timestampType;
+ /*键序列化器*/
+ private final int serializedKeySize;
+ /*值序列化器*/
+ private final int serializedValueSize;
+ /*值序列化器*/
+ private final Headers headers;
+ /*键*/
+ private final K key;
+ /*值*/
+ private final V value;
+ .....
+}
+```
+
+### 3.2 生产者属性
+
+在示例代码中`kafkaParams`封装了Kafka消费者的属性,这些属性和Spark Streaming无关,是Kafka原生API中就有定义的。其中服务器地址、键序列化器和值序列化器是必选的,其他配置是可选的。其余可选的配置项如下:
+
+#### 1. fetch.min.byte
+
+消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker会等待有足够的可用数据时才会把它返回给消费者。
+
+#### 2. fetch.max.wait.ms
+
+broker返回给消费者数据的等待时间。
+
+#### 3. max.partition.fetch.bytes
+
+分区返回给消费者的最大字节数。
+
+#### 4. session.timeout.ms
+
+消费者在被认为死亡之前可以与服务器断开连接的时间。
+
+#### 5. auto.offset.reset
+
+该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+
+- latest(默认值) :在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据;
+- earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
+
+#### 6. enable.auto.commit
+
+是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false。
+
+#### 7. client.id
+
+客户端id,服务器用来识别消息的来源。
+
+#### 8. max.poll.records
+
+单次调用call()方法能够返回的记录数量。
+
+#### 9. receive.buffer.bytes 和 send.buffer.byte
+
+这两个参数分别指定TCP socket 接收和发送数据包缓冲区的大小,-1代表使用操作系统的默认值。
+
+
+
+### 3.3 位置策略
+
+Spark Streaming中提供了如下三种位置策略,用于指定Kafka主题分区与Spark执行程序Executors之间的分配关系:
+
++ **PreferConsistent** : 通用方式,它将在所有的Executors上均匀分配分区;
+
++ **PreferBrokers** : 当Spark的Executors与Kafka brokers在同一机器上时可以选择该选项;
++ **PreferFixed** : 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机,其构造器如下:
+
+```scala
+@Experimental
+def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
+ new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+
+@Experimental
+def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
+ new PreferFixed(hostMap)
+```
+
+
+
+### 3.4 订阅方式
+
+Spark Streaming提供了两种主题订阅方式,分别为`Subscribe`和`SubscribePattern`。后者可以使用正则匹配订阅主题的名称。其构造器分别如下:
+
+```scala
+/**
+ * @param 需要订阅的主题的集合
+ * @param Kafka消费者参数
+ * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或auto.offset.reset属性的值
+ */
+def Subscribe[K, V](
+ topics: ju.Collection[jl.String],
+ kafkaParams: ju.Map[String, Object],
+ offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }
+
+/**
+ * @param 需要订阅的正则
+ * @param Kafka消费者参数
+ * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或auto.offset.reset属性的值
+ */
+def SubscribePattern[K, V](
+ pattern: ju.regex.Pattern,
+ kafkaParams: collection.Map[String, Object],
+ offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }
+```
+
+在示例代码中,我们实际上并没有指定第三个参数`offsets`,所以程序默认采用的是配置的`auto.offset.reset`属性的值latest,即在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据。
+
+### 3.5 提交偏移量
+
+在示例代码中,我们将`enable.auto.commit`设置为true,代表自动提交。在某些情况下,你可能需要更高的可靠性,如在业务完全处理完成后再提交偏移量,这时候可以使用手动提交。目前Spark Streaming没有提供关于手动提交偏移量的方法,想要进行手动提交,只能调用Kafka原生的API :
+
++ `commitSync`: 用于异步提交;
++ `commitAsync`:用于同步提交。
+
+关于编程方面就介绍到这里,下面介绍如果测试整合结果。
+
+
+
+## 四、启动测试
+
+### 4.1 创建主题
+
+#### 1. 启动Kakfa
+
+Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的:
+
+```shell
+# zookeeper启动命令
+bin/zkServer.sh start
+
+# 内置zookeeper启动命令
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+启动单节点kafka用于测试:
+
+```shell
+# bin/kafka-server-start.sh config/server.properties
+```
+
+#### 2. 创建topic
+
+```shell
+# 创建用于测试主题
+bin/kafka-topics.sh --create \
+ --bootstrap-server hadoop001:9092 \
+ --replication-factor 1 \
+ --partitions 1 \
+ --topic spark-streaming-topic
+
+# 查看所有主题
+ bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
+```
+
+#### 3. 创建生产者
+
+这里创建一个Kafka生产者,用于发送测试数据:
+
+```shell
+bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic
+```
+
+### 4.2 本地模式测试
+
+这里我直接使用本地模式启动Spark Streaming程序。启动后使用生产者发送数据,从控制台查看结果。
+
+从控制台输出中可以看到数据流已经被成功接收,由于采用`kafka-console-producer.sh`发送的数据默认是没有key的,所以key值为null。同时从输出中也可以看到在程序中指定的groupId和程序自动分配的clientId。
+
+
+
+
+
+
+
+## 参考资料
+
+1. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
\ No newline at end of file
diff --git a/notes/Zookeeper_ACL权限控制.md b/notes/Zookeeper_ACL权限控制.md
index 866c76a..62c8275 100644
--- a/notes/Zookeeper_ACL权限控制.md
+++ b/notes/Zookeeper_ACL权限控制.md
@@ -1,5 +1,23 @@
# Zookeeper ACL
+
+
+
## 一、前言
为了避免存储在Zookeeper上的数据被其他程序或者人为误修改,Zookeeper提供了ACL(Access Control Lists)进行权限控制。只有拥有对应权限的用户才可以对节点进行增删改查等操作。下文分别介绍使用原生的Shell命令和Apache Curator客户端进行权限设置。
@@ -127,7 +145,7 @@ Authentication is not valid : /hive # 当前主机已经不能访问
"-Dzookeeper.DigestAuthenticationProvider.superDigest=heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s="
```
-
+
修改完成后需要使用`zkServer.sh restart`重启服务,此时再次访问限制IP的节点:
diff --git a/notes/Zookeeper_Java客户端Curator.md b/notes/Zookeeper_Java客户端Curator.md
index cdf9fef..ccd9253 100644
--- a/notes/Zookeeper_Java客户端Curator.md
+++ b/notes/Zookeeper_Java客户端Curator.md
@@ -1,316 +1,334 @@
-# Zookeeper Java 客户端 ——Apache Curator
+# Zookeeper Java 客户端 ——Apache Curator
+
-## 一、基本依赖
-
-Curator是Netflix公司开源的一个Zookeeper客户端,目前由Apache进行维护。与Zookeeper原生客户端相比,Curator的抽象层次更高,功能也更加丰富,是Zookeeper使用范围最广的Java客户端。本片文章主要讲解其基本使用,以下项目采用Maven构建,以单元测试的方法进行讲解,相关依赖如下:
-
-```xml
-
-
- org.apache.curator
- curator-framework
- 4.0.0
-
-
- org.apache.curator
- curator-recipes
- 4.0.0
-
-
- org.apache.zookeeper
- zookeeper
- 3.4.13
-
-
-
- junit
- junit
- 4.12
-
-
-```
-
-> 完整源码见本仓库: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curator
-
-
-
-## 二、客户端相关操作
-
-### 2.1 创建客户端实例
-
-这里使用`@Before`在单元测试执行前创建客户端实例,并使用`@After`在单元测试后关闭客户端连接。
-
-```java
-public class BasicOperation {
-
- private CuratorFramework client = null;
- private static final String zkServerPath = "192.168.0.226:2181";
- private static final String nodePath = "/hadoop/yarn";
-
- @Before
- public void prepare() {
- // 重试策略
- RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
- client = CuratorFrameworkFactory.builder()
- .connectString(zkServerPath)
- .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
- .namespace("workspace").build(); //指定命名空间后,client的所有路径操作都会以/workspace开头
- client.start();
- }
-
- @After
- public void destroy() {
- if (client != null) {
- client.close();
- }
- }
-}
-```
-
-### 2.2 重试策略
-
-在连接Zookeeper服务时候,Curator提供了多种重试策略以满足各种需求,所有重试策略继承自`RetryPolicy`接口,如下图:
-
-
-
-而这些重试策略类又分为两大类别:
-
-+ RetryForever :代表一直重试,直到连接成功;
-+ SleepingRetry : 基于一定间隔时间的重试,这里以其子类`ExponentialBackoffRetry`为例说明,其构造器如下:
-
-```java
-/**
- * @param baseSleepTimeMs 重试之间等待的初始时间
- * @param maxRetries 最大重试次数
- * @param maxSleepMs 每次重试间隔的最长睡眠时间(毫秒)
- */
-ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
-```
-### 2.3 判断服务状态
-
-```scala
-@Test
-public void getStatus() {
- CuratorFrameworkState state = client.getState();
- System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
-}
-```
-
-
-
-## 三、节点增删改查
-
-### 3.1 创建节点
-
-```java
-@Test
-public void createNodes() throws Exception {
- byte[] data = "abc".getBytes();
- client.create().creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT) //节点类型
- .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
- .forPath(nodePath, data);
-}
-```
-
-创建时可以指定节点类型,这里的节点类型和Zookeeper原生的一致,全部定义在枚举类`CreateMode`中:
-
-```java
-public enum CreateMode {
- // 永久节点
- PERSISTENT (0, false, false),
- //永久有序节点
- PERSISTENT_SEQUENTIAL (2, false, true),
- // 临时节点
- EPHEMERAL (1, true, false),
- // 临时有序节点
- EPHEMERAL_SEQUENTIAL (3, true, true);
- ....
-}
-```
-
-### 2.2 获取节点信息
-
-```scala
-@Test
-public void getNode() throws Exception {
- Stat stat = new Stat();
- byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
- System.out.println("节点数据:" + new String(data));
- System.out.println("节点信息:" + stat.toString());
-}
-```
-
-如上所示,节点信息被封装在`Stat`类中,其主要属性如下:
-
-```java
-public class Stat implements Record {
- private long czxid;
- private long mzxid;
- private long ctime;
- private long mtime;
- private int version;
- private int cversion;
- private int aversion;
- private long ephemeralOwner;
- private int dataLength;
- private int numChildren;
- private long pzxid;
- ...
-}
-```
-
-每个属性的含义如下:
-
-| **状态属性** | **说明** |
-| -------------- | ------------------------------------------------------------ |
-| czxid | 数据节点创建时的事务ID |
-| ctime | 数据节点创建时的时间 |
-| mzxid | 数据节点最后一次更新时的事务ID |
-| mtime | 数据节点最后一次更新时的时间 |
-| pzxid | 数据节点的子节点最后一次被修改时的事务ID |
-| cversion | 子节点的更改次数 |
-| version | 节点数据的更改次数 |
-| aversion | 节点的ACL的更改次数 |
-| ephemeralOwner | 如果节点是临时节点,则表示创建该节点的会话的SessionID;如果节点是持久节点,则该属性值为0 |
-| dataLength | 数据内容的长度 |
-| numChildren | 数据节点当前的子节点个数 |
-
-### 2.3 获取子节点列表
-
-```java
-@Test
-public void getChildrenNodes() throws Exception {
- List childNodes = client.getChildren().forPath("/hadoop");
- for (String s : childNodes) {
- System.out.println(s);
- }
-}
-```
-
-### 2.4 更新节点
-
-更新时可以传入版本号也可以不传入,如果传入则类似于乐观锁机制,只有在版本号正确的时候才会被更新。
-
-```scala
-@Test
-public void updateNode() throws Exception {
- byte[] newData = "defg".getBytes();
- client.setData().withVersion(0) // 传入版本号,如果版本号错误则拒绝更新操作,并抛出BadVersion异常
- .forPath(nodePath, newData);
-}
-```
-
-### 2.5 删除节点
-
-```java
-@Test
-public void deleteNodes() throws Exception {
- client.delete()
- .guaranteed() // 如果删除失败,那么在会继续执行,直到成功
- .deletingChildrenIfNeeded() // 如果有子节点,则递归删除
- .withVersion(0) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出BadVersion异常
- .forPath(nodePath);
-}
-```
-
-### 2.6 判断节点是否存在
-
-```java
-@Test
-public void existNode() throws Exception {
- // 如果节点存在则返回其状态信息如果不存在则为null
- Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
- System.out.println("节点是否存在:" + !(stat == null));
-}
-```
-
-
-
-## 三、监听事件
-
-### 3.1 创建一次性监听
-
-和Zookeeper原生监听一样,使用`usingWatcher`注册的监听是一次性的,即监听只会触发一次,触发后就销毁。示例如下:
-
-```java
-@Test
-public void DisposableWatch() throws Exception {
- client.getData().usingWatcher(new CuratorWatcher() {
- public void process(WatchedEvent event) {
- System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
- }
- }).forPath(nodePath);
- Thread.sleep(1000 * 1000); //休眠以观察测试效果
-}
-```
-
-### 3.2 创建永久监听
-
-Curator还提供了创建永久监听的API,其使用方式如下:
-
-```java
-@Test
-public void permanentWatch() throws Exception {
- // 使用NodeCache包装节点,对其注册的监听作用于节点,且是永久性的
- NodeCache nodeCache = new NodeCache(client, nodePath);
- // 通常设置为true, 代表创建nodeCache时,就去获取对应节点的值并缓存
- nodeCache.start(true);
- nodeCache.getListenable().addListener(new NodeCacheListener() {
- public void nodeChanged() {
- ChildData currentData = nodeCache.getCurrentData();
- if (currentData != null) {
- System.out.println("节点路径:" + currentData.getPath() +
- "数据:" + new String(currentData.getData()));
- }
- }
- });
- Thread.sleep(1000 * 1000); //休眠以观察测试效果
-}
-```
-
-### 3.3 监听子节点
-
-这里以监听`/hadoop`下所有子节点为例,实现方式如下:
-
-```scala
-@Test
-public void permanentChildrenNodesWatch() throws Exception {
-
- // 第三个参数代表除了节点状态外,是否还缓存节点内容
- PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
- /*
- * StartMode代表初始化方式:
- * NORMAL: 异步初始化
- * BUILD_INITIAL_CACHE: 同步初始化
- * POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发INITIALIZED事件
- */
- childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
-
- List childDataList = childrenCache.getCurrentData();
- System.out.println("当前数据节点的子节点列表:");
- childDataList.forEach(x -> System.out.println(x.getPath()));
-
- childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
- switch (event.getType()) {
- case INITIALIZED:
- System.out.println("childrenCache初始化完成");
- break;
- case CHILD_ADDED:
- // 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入childrenCache缓存中
- System.out.println("增加子节点:" + event.getData().getPath());
- break;
- case CHILD_REMOVED:
- System.out.println("删除子节点:" + event.getData().getPath());
- break;
- case CHILD_UPDATED:
- System.out.println("被修改的子节点的路径:" + event.getData().getPath());
- System.out.println("修改后的数据:" + new String(event.getData().getData()));
- break;
- }
- }
- });
- Thread.sleep(1000 * 1000); //休眠以观察测试效果
-}
+## 一、基本依赖
+
+Curator是Netflix公司开源的一个Zookeeper客户端,目前由Apache进行维护。与Zookeeper原生客户端相比,Curator的抽象层次更高,功能也更加丰富,是Zookeeper使用范围最广的Java客户端。本片文章主要讲解其基本使用,以下项目采用Maven构建,以单元测试的方法进行讲解,相关依赖如下:
+
+```xml
+
+
+ org.apache.curator
+ curator-framework
+ 4.0.0
+
+
+ org.apache.curator
+ curator-recipes
+ 4.0.0
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.13
+
+
+
+ junit
+ junit
+ 4.12
+
+
+```
+
+> 完整源码见本仓库: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curator
+
+
+
+## 二、客户端相关操作
+
+### 2.1 创建客户端实例
+
+这里使用`@Before`在单元测试执行前创建客户端实例,并使用`@After`在单元测试后关闭客户端连接。
+
+```java
+public class BasicOperation {
+
+ private CuratorFramework client = null;
+ private static final String zkServerPath = "192.168.0.226:2181";
+ private static final String nodePath = "/hadoop/yarn";
+
+ @Before
+ public void prepare() {
+ // 重试策略
+ RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
+ client = CuratorFrameworkFactory.builder()
+ .connectString(zkServerPath)
+ .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
+ .namespace("workspace").build(); //指定命名空间后,client的所有路径操作都会以/workspace开头
+ client.start();
+ }
+
+ @After
+ public void destroy() {
+ if (client != null) {
+ client.close();
+ }
+ }
+}
+```
+
+### 2.2 重试策略
+
+在连接Zookeeper服务时候,Curator提供了多种重试策略以满足各种需求,所有重试策略继承自`RetryPolicy`接口,如下图:
+
+
+
+而这些重试策略类又分为两大类别:
+
++ RetryForever :代表一直重试,直到连接成功;
++ SleepingRetry : 基于一定间隔时间的重试,这里以其子类`ExponentialBackoffRetry`为例说明,其构造器如下:
+
+```java
+/**
+ * @param baseSleepTimeMs 重试之间等待的初始时间
+ * @param maxRetries 最大重试次数
+ * @param maxSleepMs 每次重试间隔的最长睡眠时间(毫秒)
+ */
+ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
+```
+### 2.3 判断服务状态
+
+```scala
+@Test
+public void getStatus() {
+ CuratorFrameworkState state = client.getState();
+ System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
+}
+```
+
+
+
+## 三、节点增删改查
+
+### 3.1 创建节点
+
+```java
+@Test
+public void createNodes() throws Exception {
+ byte[] data = "abc".getBytes();
+ client.create().creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT) //节点类型
+ .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+ .forPath(nodePath, data);
+}
+```
+
+创建时可以指定节点类型,这里的节点类型和Zookeeper原生的一致,全部定义在枚举类`CreateMode`中:
+
+```java
+public enum CreateMode {
+ // 永久节点
+ PERSISTENT (0, false, false),
+ //永久有序节点
+ PERSISTENT_SEQUENTIAL (2, false, true),
+ // 临时节点
+ EPHEMERAL (1, true, false),
+ // 临时有序节点
+ EPHEMERAL_SEQUENTIAL (3, true, true);
+ ....
+}
+```
+
+### 2.2 获取节点信息
+
+```scala
+@Test
+public void getNode() throws Exception {
+ Stat stat = new Stat();
+ byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
+ System.out.println("节点数据:" + new String(data));
+ System.out.println("节点信息:" + stat.toString());
+}
+```
+
+如上所示,节点信息被封装在`Stat`类中,其主要属性如下:
+
+```java
+public class Stat implements Record {
+ private long czxid;
+ private long mzxid;
+ private long ctime;
+ private long mtime;
+ private int version;
+ private int cversion;
+ private int aversion;
+ private long ephemeralOwner;
+ private int dataLength;
+ private int numChildren;
+ private long pzxid;
+ ...
+}
+```
+
+每个属性的含义如下:
+
+| **状态属性** | **说明** |
+| -------------- | ------------------------------------------------------------ |
+| czxid | 数据节点创建时的事务ID |
+| ctime | 数据节点创建时的时间 |
+| mzxid | 数据节点最后一次更新时的事务ID |
+| mtime | 数据节点最后一次更新时的时间 |
+| pzxid | 数据节点的子节点最后一次被修改时的事务ID |
+| cversion | 子节点的更改次数 |
+| version | 节点数据的更改次数 |
+| aversion | 节点的ACL的更改次数 |
+| ephemeralOwner | 如果节点是临时节点,则表示创建该节点的会话的SessionID;如果节点是持久节点,则该属性值为0 |
+| dataLength | 数据内容的长度 |
+| numChildren | 数据节点当前的子节点个数 |
+
+### 2.3 获取子节点列表
+
+```java
+@Test
+public void getChildrenNodes() throws Exception {
+ List childNodes = client.getChildren().forPath("/hadoop");
+ for (String s : childNodes) {
+ System.out.println(s);
+ }
+}
+```
+
+### 2.4 更新节点
+
+更新时可以传入版本号也可以不传入,如果传入则类似于乐观锁机制,只有在版本号正确的时候才会被更新。
+
+```scala
+@Test
+public void updateNode() throws Exception {
+ byte[] newData = "defg".getBytes();
+ client.setData().withVersion(0) // 传入版本号,如果版本号错误则拒绝更新操作,并抛出BadVersion异常
+ .forPath(nodePath, newData);
+}
+```
+
+### 2.5 删除节点
+
+```java
+@Test
+public void deleteNodes() throws Exception {
+ client.delete()
+ .guaranteed() // 如果删除失败,那么在会继续执行,直到成功
+ .deletingChildrenIfNeeded() // 如果有子节点,则递归删除
+ .withVersion(0) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出BadVersion异常
+ .forPath(nodePath);
+}
+```
+
+### 2.6 判断节点是否存在
+
+```java
+@Test
+public void existNode() throws Exception {
+ // 如果节点存在则返回其状态信息如果不存在则为null
+ Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
+ System.out.println("节点是否存在:" + !(stat == null));
+}
+```
+
+
+
+## 三、监听事件
+
+### 3.1 创建一次性监听
+
+和Zookeeper原生监听一样,使用`usingWatcher`注册的监听是一次性的,即监听只会触发一次,触发后就销毁。示例如下:
+
+```java
+@Test
+public void DisposableWatch() throws Exception {
+ client.getData().usingWatcher(new CuratorWatcher() {
+ public void process(WatchedEvent event) {
+ System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
+ }
+ }).forPath(nodePath);
+ Thread.sleep(1000 * 1000); //休眠以观察测试效果
+}
+```
+
+### 3.2 创建永久监听
+
+Curator还提供了创建永久监听的API,其使用方式如下:
+
+```java
+@Test
+public void permanentWatch() throws Exception {
+ // 使用NodeCache包装节点,对其注册的监听作用于节点,且是永久性的
+ NodeCache nodeCache = new NodeCache(client, nodePath);
+ // 通常设置为true, 代表创建nodeCache时,就去获取对应节点的值并缓存
+ nodeCache.start(true);
+ nodeCache.getListenable().addListener(new NodeCacheListener() {
+ public void nodeChanged() {
+ ChildData currentData = nodeCache.getCurrentData();
+ if (currentData != null) {
+ System.out.println("节点路径:" + currentData.getPath() +
+ "数据:" + new String(currentData.getData()));
+ }
+ }
+ });
+ Thread.sleep(1000 * 1000); //休眠以观察测试效果
+}
+```
+
+### 3.3 监听子节点
+
+这里以监听`/hadoop`下所有子节点为例,实现方式如下:
+
+```scala
+@Test
+public void permanentChildrenNodesWatch() throws Exception {
+
+ // 第三个参数代表除了节点状态外,是否还缓存节点内容
+ PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
+ /*
+ * StartMode代表初始化方式:
+ * NORMAL: 异步初始化
+ * BUILD_INITIAL_CACHE: 同步初始化
+ * POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发INITIALIZED事件
+ */
+ childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
+
+ List childDataList = childrenCache.getCurrentData();
+ System.out.println("当前数据节点的子节点列表:");
+ childDataList.forEach(x -> System.out.println(x.getPath()));
+
+ childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
+ switch (event.getType()) {
+ case INITIALIZED:
+ System.out.println("childrenCache初始化完成");
+ break;
+ case CHILD_ADDED:
+ // 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入childrenCache缓存中
+ System.out.println("增加子节点:" + event.getData().getPath());
+ break;
+ case CHILD_REMOVED:
+ System.out.println("删除子节点:" + event.getData().getPath());
+ break;
+ case CHILD_UPDATED:
+ System.out.println("被修改的子节点的路径:" + event.getData().getPath());
+ System.out.println("修改后的数据:" + new String(event.getData().getData()));
+ break;
+ }
+ }
+ });
+ Thread.sleep(1000 * 1000); //休眠以观察测试效果
+}
```
\ No newline at end of file
diff --git a/notes/Zookeeper常用Shell命令.md b/notes/Zookeeper常用Shell命令.md
index 48e7644..077dbab 100644
--- a/notes/Zookeeper常用Shell命令.md
+++ b/notes/Zookeeper常用Shell命令.md
@@ -1,5 +1,22 @@
# Zookeeper常用Shell命令
+
+
+
## 一、节点增删改查
### 1.1 启动服务和连接服务
diff --git a/notes/Zookeeper简介及核心概念.md b/notes/Zookeeper简介及核心概念.md
index 2c2c457..57ad26f 100644
--- a/notes/Zookeeper简介及核心概念.md
+++ b/notes/Zookeeper简介及核心概念.md
@@ -1,5 +1,27 @@
# Zookeeper简介及核心概念
+
+
+
## 一、Zookeeper简介
Zookeeper是一个开源的分布式协调服务,由雅虎创建,是Google Chubby的开源实现。Zookeeper可以用于实现分布式系统中常见的发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等功能。Zookeeeper可以保证如下的分布式一致性特性:
@@ -20,13 +42,13 @@ Zookeeper致力于为那些高吞吐的大型分布式系统提供一个高性
Zookeeper通过树形结构来存储数据,它由一系列被称为ZNode的数据节点组成,类似于常见的文件系统。不过和常见的文件系统不同,Zookeeper将数据全量存储在内存中,以此来实现高吞吐,减少访问延迟。
-
+
### 2.2 目标二:构建集群
可以由一组Zookeeper服务构成Zookeeper服务集群,集群中每台机器都会单独在内存中维护自身的状态,并且每台机器之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务。
-
+
### 2.3 目标三:顺序访问
@@ -105,7 +127,7 @@ ZAB协议是Zookeeper专门设计的一种支持崩溃恢复的原子广播协
Zookeeper使用一个单一的主进程来接收并处理客户端的所有事务请求,并采用原子广播协议将数据的状态变更以事务Proposal的形式广播到所有的副本进程上去。如下图:
-
+
具体流程如下:
@@ -125,7 +147,7 @@ ZAB协议的消息广播过程使用的是原子广播协议。在整个消息
Leader服务会为每一个Follower服务器分配一个单独的队列,然后将事务Proposal依次放入队列中,并根据FIFO(先进先出)的策略进行消息发送。Follower服务在接收到Proposal后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给Leader一个Ack响应。当Leader接收到超过半数Follower的Ack响应后,就会广播一个Commit消息给所有的Follower以通知其进行事务提交,之后Leader自身也会完成对事务的提交。而每一个Follower则在接收到Commit消息后,完成事务的提交。
-
+
diff --git a/pictures/spark-straming-kafka-console.png b/pictures/spark-straming-kafka-console.png
new file mode 100644
index 0000000..a277a96
Binary files /dev/null and b/pictures/spark-straming-kafka-console.png differ