格式转换

This commit is contained in:
luoxiang 2019-05-26 19:03:12 +08:00
parent f5d045bfba
commit 4dd53cfbe1
7 changed files with 718 additions and 320 deletions

View File

@ -92,7 +92,7 @@
1. [Spark Streaming简介](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming与流处理.md)
2. DStream常用操作详解
3. [Spark Streaming 整合 Flume](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Flume.md)
4. Spark Streaming 整合 Kafka
4. [Spark Streaming 整合 Kafka](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Spark_Streaming整合Kafka.md)
## 四、Flink

View File

@ -0,0 +1,323 @@
# Spark Streaming 整合 Kafka
<nav>
<a href="#一版本说明">一、版本说明</a><br/>
<a href="#二项目依赖">二、项目依赖</a><br/>
<a href="#三整合Kafka">三、整合Kafka</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-ConsumerRecord">3.1 ConsumerRecord</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-生产者属性">3.2 生产者属性</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#33-位置策略">3.3 位置策略</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#34-订阅方式">3.4 订阅方式</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#35-提交偏移量">3.5 提交偏移量</a><br/>
<a href="#四启动测试">四、启动测试</a><br/>
</nav>
## 一、版本说明
Spark针对Kafka的不同版本提供了两套整合方案`spark-streaming-kafka-0-8``spark-streaming-kafka-0-10`,其主要区别如下:
| | [spark-streaming-kafka-0-8](https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html) | [spark-streaming-kafka-0-10](https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) |
| :--------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- |
| Kafka版本 | 0.8.2.1 or higher | 0.10.0 or higher |
| AP状态 | Deprecated<br/>从Spark 2.3.0版本开始Kafka 0.8支持已被弃用 | Stable |
| 语言支持 | Scala, Java, Python | Scala, Java |
| Receiver DStream | Yes | No |
| Direct DStream | Yes | Yes |
| SSL / TLS Support | No | Yes |
| Offset Commit API(偏移量提交) | No | Yes |
| Dynamic Topic Subscription(动态主题订阅) | No | Yes |
本文使用的Kafka版本为`kafka_2.12-2.2.0`,故采用第二种方式进行整合。
## 二、项目依赖
项目采用Maven进行构建主要依赖如下
```xml
<dependencies>
<!-- Spark Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming整合Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>2.4.3</version>
</dependency>
<!--本地测试时如果出现异常:Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 10582 则是因为paranamer版本问题可以添加下面的依赖包进行解决-->
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
</dependencies>
```
> 完整源码见本仓库https://github.com/heibaiying/BigData-Notes/tree/master/code/spark/spark-streaming-kafka
## 三、整合Kafka
通过调用`KafkaUtils`对象的`createDirectStream`方法来创建输入流,完整代码如下:
```scala
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark streaming 整合 kafka
*/
object KafkaDirectStream {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, Object](
/*
* 指定broker的地址清单清单里不需要包含所有的broker地址生产者会从给定的broker里查找其他broker的信息。
* 不过建议至少提供两个broker的信息作为容错。
*/
"bootstrap.servers" -> "hadoop001:9092",
/*键的序列化器*/
"key.deserializer" -> classOf[StringDeserializer],
/*值的序列化器*/
"value.deserializer" -> classOf[StringDeserializer],
/*消费者所在分组的ID*/
"group.id" -> "spark-streaming-group",
/*
* 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
* latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
* earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
*/
"auto.offset.reset" -> "latest",
/*是否自动提交*/
"enable.auto.commit" -> (true: java.lang.Boolean)
)
/*可以同时订阅多个主题*/
val topics = Array("spark-streaming-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
/*位置策略*/
PreferConsistent,
/*订阅主题*/
Subscribe[String, String](topics, kafkaParams)
)
/*打印输入流*/
stream.map(record => (record.key, record.value)).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
```
### 3.1 ConsumerRecord
这里获得的输入流中每一个Record实际上是`ConsumerRecord<K, V> `的实例其包含了Record的所有可用信息源码如下
```scala
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
/*主题名称*/
private final String topic;
/*分区编号*/
private final int partition;
/*偏移量*/
private final long offset;
/*时间戳*/
private final long timestamp;
/*时间戳代表的含义*/
private final TimestampType timestampType;
/*键序列化器*/
private final int serializedKeySize;
/*值序列化器*/
private final int serializedValueSize;
/*值序列化器*/
private final Headers headers;
/*键*/
private final K key;
/*值*/
private final V value;
.....
}
```
### 3.2 生产者属性
在示例代码中`kafkaParams`封装了Kafka消费者的属性这些属性和Spark Streaming无关是Kafka原生API中就有定义的。其中服务器地址、键序列化器和值序列化器是必选的其他配置是可选的。其余可选的配置项如下
#### 1. fetch.min.byte
消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值broker会等待有足够的可用数据时才会把它返回给消费者。
#### 2. fetch.max.wait.ms
broker返回给消费者数据的等待时间。
#### 3. max.partition.fetch.bytes
分区返回给消费者的最大字节数。
#### 4. session.timeout.ms
消费者在被认为死亡之前可以与服务器断开连接的时间。
#### 5. auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
- latest(默认值) :在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据;
- earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
#### 6. enable.auto.commit
是否自动提交偏移量默认值是true,为了避免出现重复数据和数据丢失可以把它设置为false。
#### 7. client.id
客户端id服务器用来识别消息的来源。
#### 8. max.poll.records
单次调用call()方法能够返回的记录数量。
#### 9. receive.buffer.bytes 和 send.buffer.byte
这两个参数分别指定TCP socket 接收和发送数据包缓冲区的大小,-1代表使用操作系统的默认值。
### 3.3 位置策略
Spark Streaming中提供了如下三种位置策略用于指定Kafka主题分区与Spark执行程序Executors之间的分配关系
+ **PreferConsistent** : 通用方式它将在所有的Executors上均匀分配分区
+ **PreferBrokers** : 当Spark的Executors与Kafka brokers在同一机器上时可以选择该选项
+ **PreferFixed** : 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机,其构造器如下:
```scala
@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(hostMap)
```
### 3.4 订阅方式
Spark Streaming提供了两种主题订阅方式分别为`Subscribe``SubscribePattern`。后者可以使用正则匹配订阅主题的名称。其构造器分别如下:
```scala
/**
* @param 需要订阅的主题的集合
* @param Kafka消费者参数
* @param offsets(可选): 在初始启动时开始的偏移量。如果没有则将使用保存的偏移量或auto.offset.reset属性的值
*/
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }
/**
* @param 需要订阅的正则
* @param Kafka消费者参数
* @param offsets(可选): 在初始启动时开始的偏移量。如果没有则将使用保存的偏移量或auto.offset.reset属性的值
*/
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }
```
在示例代码中,我们实际上并没有指定第三个参数`offsets`,所以程序默认采用的是配置的`auto.offset.reset`属性的值latest即在偏移量无效的情况下消费者将从其启动之后生成的最新的记录开始读取数据。
### 3.5 提交偏移量
在示例代码中,我们将`enable.auto.commit`设置为true代表自动提交。在某些情况下你可能需要更高的可靠性如在业务完全处理完成后再提交偏移量这时候可以使用手动提交。目前Spark Streaming没有提供关于手动提交偏移量的方法想要进行手动提交只能调用Kafka原生的API :
+ `commitSync`: 用于异步提交;
+ `commitAsync`:用于同步提交。
关于编程方面就介绍到这里,下面介绍如果测试整合结果。
## 四、启动测试
### 4.1 创建主题
#### 1. 启动Kakfa
Kafka的运行依赖于zookeeper需要预先启动可以启动Kafka内置的zookeeper也可以启动自己安装的
```shell
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
```
启动单节点kafka用于测试
```shell
# bin/kafka-server-start.sh config/server.properties
```
#### 2. 创建topic
```shell
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic spark-streaming-topic
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
```
#### 3. 创建生产者
这里创建一个Kafka生产者用于发送测试数据
```shell
bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic
```
### 4.2 本地模式测试
这里我直接使用本地模式启动Spark Streaming程序。启动后使用生产者发送数据从控制台查看结果。
从控制台输出中可以看到数据流已经被成功接收,由于采用`kafka-console-producer.sh`发送的数据默认是没有key的所以key值为null。同时从输出中也可以看到在程序中指定的groupId和程序自动分配的clientId。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/spark-straming-kafka-console.png"/> </div>
## 参考资料
1. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

View File

@ -1,5 +1,23 @@
# Zookeeper ACL
<nav>
<a href="#一前言">一、前言</a><br/>
<a href="#二使用Shell进行权限管理">二、使用Shell进行权限管理</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-设置与查看权限">2.1 设置与查看权限</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-权限组成">2.2 权限组成</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-添加认证信息">2.3 添加认证信息</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#24-权限设置示例">2.4 权限设置示例</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#1-world模式">1. world模式</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#2-auth模式">2. auth模式</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#3-digest模式">3. digest模式</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#4-ip模式">4. ip模式</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#5-super模式">5. super模式</a><br/>
<a href="#三使用Java客户端进行权限管理">三、使用Java客户端进行权限管理</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-主要依赖">3.1 主要依赖</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-权限管理API">3.2 权限管理API</a><br/>
</nav>
## 一、前言
为了避免存储在Zookeeper上的数据被其他程序或者人为误修改Zookeeper提供了ACL(Access Control Lists)进行权限控制。只有拥有对应权限的用户才可以对节点进行增删改查等操作。下文分别介绍使用原生的Shell命令和Apache Curator客户端进行权限设置。
@ -127,7 +145,7 @@ Authentication is not valid : /hive # 当前主机已经不能访问
"-Dzookeeper.DigestAuthenticationProvider.superDigest=heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s="
```
![zookeeper-super](D:\BigData-Notes\pictures\zookeeper-super.png)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-super.png"/> </div>
修改完成后需要使用`zkServer.sh restart`重启服务此时再次访问限制IP的节点

View File

@ -1,316 +1,334 @@
# Zookeeper Java 客户端 ——Apache Curator
# Zookeeper Java 客户端 ——Apache Curator
<nav>
<a href="#一基本依赖">一、基本依赖</a><br/>
<a href="#二客户端相关操作">二、客户端相关操作</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-创建客户端实例">2.1 创建客户端实例</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-重试策略">2.2 重试策略</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-判断服务状态">2.3 判断服务状态</a><br/>
<a href="#三节点增删改查">三、节点增删改查</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-创建节点">3.1 创建节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-获取节点信息">2.2 获取节点信息</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-获取子节点列表">2.3 获取子节点列表</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#24-更新节点">2.4 更新节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#25-删除节点">2.5 删除节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#26-判断节点是否存在">2.6 判断节点是否存在</a><br/>
<a href="#三监听事件">三、监听事件</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-创建一次性监听">3.1 创建一次性监听</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-创建永久监听">3.2 创建永久监听</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#33-监听子节点">3.3 监听子节点</a><br/>
</nav>
## 一、基本依赖
Curator是Netflix公司开源的一个Zookeeper客户端目前由Apache进行维护。与Zookeeper原生客户端相比Curator的抽象层次更高功能也更加丰富是Zookeeper使用范围最广的Java客户端。本片文章主要讲解其基本使用以下项目采用Maven构建以单元测试的方法进行讲解相关依赖如下
```xml
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
<!--单元测试相关依赖-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
```
> 完整源码见本仓库: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curator
## 二、客户端相关操作
### 2.1 创建客户端实例
这里使用`@Before`在单元测试执行前创建客户端实例,并使用`@After`在单元测试后关闭客户端连接。
```java
public class BasicOperation {
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.0.226:2181";
private static final String nodePath = "/hadoop/yarn";
@Before
public void prepare() {
// 重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build(); //指定命名空间后client的所有路径操作都会以/workspace开头
client.start();
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}
```
### 2.2 重试策略
在连接Zookeeper服务时候Curator提供了多种重试策略以满足各种需求所有重试策略继承自`RetryPolicy`接口,如下图:
![curator-retry-policy](D:\BigData-Notes\pictures\curator-retry-policy.png)
而这些重试策略类又分为两大类别:
+ RetryForever :代表一直重试,直到连接成功;
+ SleepingRetry 基于一定间隔时间的重试,这里以其子类`ExponentialBackoffRetry`为例说明,其构造器如下:
```java
/**
* @param baseSleepTimeMs 重试之间等待的初始时间
* @param maxRetries 最大重试次数
* @param maxSleepMs 每次重试间隔的最长睡眠时间(毫秒)
*/
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
```
### 2.3 判断服务状态
```scala
@Test
public void getStatus() {
CuratorFrameworkState state = client.getState();
System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
}
```
## 三、节点增删改查
### 3.1 创建节点
```java
@Test
public void createNodes() throws Exception {
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
}
```
创建时可以指定节点类型这里的节点类型和Zookeeper原生的一致全部定义在枚举类`CreateMode`中:
```java
public enum CreateMode {
// 永久节点
PERSISTENT (0, false, false),
//永久有序节点
PERSISTENT_SEQUENTIAL (2, false, true),
// 临时节点
EPHEMERAL (1, true, false),
// 临时有序节点
EPHEMERAL_SEQUENTIAL (3, true, true);
....
}
```
### 2.2 获取节点信息
```scala
@Test
public void getNode() throws Exception {
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点数据:" + new String(data));
System.out.println("节点信息:" + stat.toString());
}
```
如上所示,节点信息被封装在`Stat`类中,其主要属性如下:
```java
public class Stat implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private int dataLength;
private int numChildren;
private long pzxid;
...
}
```
每个属性的含义如下:
| **状态属性** | **说明** |
| -------------- | ------------------------------------------------------------ |
| czxid | 数据节点创建时的事务ID |
| ctime | 数据节点创建时的时间 |
| mzxid | 数据节点最后一次更新时的事务ID |
| mtime | 数据节点最后一次更新时的时间 |
| pzxid | 数据节点的子节点最后一次被修改时的事务ID |
| cversion | 子节点的更改次数 |
| version | 节点数据的更改次数 |
| aversion | 节点的ACL的更改次数 |
| ephemeralOwner | 如果节点是临时节点则表示创建该节点的会话的SessionID如果节点是持久节点则该属性值为0 |
| dataLength | 数据内容的长度 |
| numChildren | 数据节点当前的子节点个数 |
### 2.3 获取子节点列表
```java
@Test
public void getChildrenNodes() throws Exception {
List<String> childNodes = client.getChildren().forPath("/hadoop");
for (String s : childNodes) {
System.out.println(s);
}
}
```
### 2.4 更新节点
更新时可以传入版本号也可以不传入,如果传入则类似于乐观锁机制,只有在版本号正确的时候才会被更新。
```scala
@Test
public void updateNode() throws Exception {
byte[] newData = "defg".getBytes();
client.setData().withVersion(0) // 传入版本号,如果版本号错误则拒绝更新操作,并抛出BadVersion异常
.forPath(nodePath, newData);
}
```
### 2.5 删除节点
```java
@Test
public void deleteNodes() throws Exception {
client.delete()
.guaranteed() // 如果删除失败,那么在会继续执行,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,则递归删除
.withVersion(0) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出BadVersion异常
.forPath(nodePath);
}
```
### 2.6 判断节点是否存在
```java
@Test
public void existNode() throws Exception {
// 如果节点存在则返回其状态信息如果不存在则为null
Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
System.out.println("节点是否存在:" + !(stat == null));
}
```
## 三、监听事件
### 3.1 创建一次性监听
和Zookeeper原生监听一样使用`usingWatcher`注册的监听是一次性的,即监听只会触发一次,触发后就销毁。示例如下:
```java
@Test
public void DisposableWatch() throws Exception {
client.getData().usingWatcher(new CuratorWatcher() {
public void process(WatchedEvent event) {
System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
}
}).forPath(nodePath);
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
```
### 3.2 创建永久监听
Curator还提供了创建永久监听的API其使用方式如下
```java
@Test
public void permanentWatch() throws Exception {
// 使用NodeCache包装节点对其注册的监听作用于节点且是永久性的
NodeCache nodeCache = new NodeCache(client, nodePath);
// 通常设置为true, 代表创建nodeCache时,就去获取对应节点的值并缓存
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
System.out.println("节点路径:" + currentData.getPath() +
"数据:" + new String(currentData.getData()));
}
}
});
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
```
### 3.3 监听子节点
这里以监听`/hadoop`下所有子节点为例,实现方式如下:
```scala
@Test
public void permanentChildrenNodesWatch() throws Exception {
// 第三个参数代表除了节点状态外,是否还缓存节点内容
PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
/*
* StartMode代表初始化方式:
* NORMAL: 异步初始化
* BUILD_INITIAL_CACHE: 同步初始化
* POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发INITIALIZED事件
*/
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点列表:");
childDataList.forEach(x -> System.out.println(x.getPath()));
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case INITIALIZED:
System.out.println("childrenCache初始化完成");
break;
case CHILD_ADDED:
// 需要注意的是: 即使是之前已经存在的子节点也会触发该监听因为会把该子节点加入childrenCache缓存中
System.out.println("增加子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("被修改的子节点的路径:" + event.getData().getPath());
System.out.println("修改后的数据:" + new String(event.getData().getData()));
break;
}
}
});
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
## 一、基本依赖
Curator是Netflix公司开源的一个Zookeeper客户端目前由Apache进行维护。与Zookeeper原生客户端相比Curator的抽象层次更高功能也更加丰富是Zookeeper使用范围最广的Java客户端。本片文章主要讲解其基本使用以下项目采用Maven构建以单元测试的方法进行讲解相关依赖如下
```xml
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
<!--单元测试相关依赖-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
```
> 完整源码见本仓库: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curator
## 二、客户端相关操作
### 2.1 创建客户端实例
这里使用`@Before`在单元测试执行前创建客户端实例,并使用`@After`在单元测试后关闭客户端连接。
```java
public class BasicOperation {
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.0.226:2181";
private static final String nodePath = "/hadoop/yarn";
@Before
public void prepare() {
// 重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build(); //指定命名空间后client的所有路径操作都会以/workspace开头
client.start();
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}
```
### 2.2 重试策略
在连接Zookeeper服务时候Curator提供了多种重试策略以满足各种需求所有重试策略继承自`RetryPolicy`接口,如下图:
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/curator-retry-policy.png"/> </div>
而这些重试策略类又分为两大类别:
+ RetryForever :代表一直重试,直到连接成功;
+ SleepingRetry 基于一定间隔时间的重试,这里以其子类`ExponentialBackoffRetry`为例说明,其构造器如下:
```java
/**
* @param baseSleepTimeMs 重试之间等待的初始时间
* @param maxRetries 最大重试次数
* @param maxSleepMs 每次重试间隔的最长睡眠时间(毫秒)
*/
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
```
### 2.3 判断服务状态
```scala
@Test
public void getStatus() {
CuratorFrameworkState state = client.getState();
System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
}
```
## 三、节点增删改查
### 3.1 创建节点
```java
@Test
public void createNodes() throws Exception {
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
}
```
创建时可以指定节点类型这里的节点类型和Zookeeper原生的一致全部定义在枚举类`CreateMode`中:
```java
public enum CreateMode {
// 永久节点
PERSISTENT (0, false, false),
//永久有序节点
PERSISTENT_SEQUENTIAL (2, false, true),
// 临时节点
EPHEMERAL (1, true, false),
// 临时有序节点
EPHEMERAL_SEQUENTIAL (3, true, true);
....
}
```
### 2.2 获取节点信息
```scala
@Test
public void getNode() throws Exception {
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点数据:" + new String(data));
System.out.println("节点信息:" + stat.toString());
}
```
如上所示,节点信息被封装在`Stat`类中,其主要属性如下:
```java
public class Stat implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private int dataLength;
private int numChildren;
private long pzxid;
...
}
```
每个属性的含义如下:
| **状态属性** | **说明** |
| -------------- | ------------------------------------------------------------ |
| czxid | 数据节点创建时的事务ID |
| ctime | 数据节点创建时的时间 |
| mzxid | 数据节点最后一次更新时的事务ID |
| mtime | 数据节点最后一次更新时的时间 |
| pzxid | 数据节点的子节点最后一次被修改时的事务ID |
| cversion | 子节点的更改次数 |
| version | 节点数据的更改次数 |
| aversion | 节点的ACL的更改次数 |
| ephemeralOwner | 如果节点是临时节点则表示创建该节点的会话的SessionID如果节点是持久节点则该属性值为0 |
| dataLength | 数据内容的长度 |
| numChildren | 数据节点当前的子节点个数 |
### 2.3 获取子节点列表
```java
@Test
public void getChildrenNodes() throws Exception {
List<String> childNodes = client.getChildren().forPath("/hadoop");
for (String s : childNodes) {
System.out.println(s);
}
}
```
### 2.4 更新节点
更新时可以传入版本号也可以不传入,如果传入则类似于乐观锁机制,只有在版本号正确的时候才会被更新。
```scala
@Test
public void updateNode() throws Exception {
byte[] newData = "defg".getBytes();
client.setData().withVersion(0) // 传入版本号,如果版本号错误则拒绝更新操作,并抛出BadVersion异常
.forPath(nodePath, newData);
}
```
### 2.5 删除节点
```java
@Test
public void deleteNodes() throws Exception {
client.delete()
.guaranteed() // 如果删除失败,那么在会继续执行,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,则递归删除
.withVersion(0) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出BadVersion异常
.forPath(nodePath);
}
```
### 2.6 判断节点是否存在
```java
@Test
public void existNode() throws Exception {
// 如果节点存在则返回其状态信息如果不存在则为null
Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
System.out.println("节点是否存在:" + !(stat == null));
}
```
## 三、监听事件
### 3.1 创建一次性监听
和Zookeeper原生监听一样使用`usingWatcher`注册的监听是一次性的,即监听只会触发一次,触发后就销毁。示例如下:
```java
@Test
public void DisposableWatch() throws Exception {
client.getData().usingWatcher(new CuratorWatcher() {
public void process(WatchedEvent event) {
System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
}
}).forPath(nodePath);
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
```
### 3.2 创建永久监听
Curator还提供了创建永久监听的API其使用方式如下
```java
@Test
public void permanentWatch() throws Exception {
// 使用NodeCache包装节点对其注册的监听作用于节点且是永久性的
NodeCache nodeCache = new NodeCache(client, nodePath);
// 通常设置为true, 代表创建nodeCache时,就去获取对应节点的值并缓存
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
System.out.println("节点路径:" + currentData.getPath() +
"数据:" + new String(currentData.getData()));
}
}
});
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
```
### 3.3 监听子节点
这里以监听`/hadoop`下所有子节点为例,实现方式如下:
```scala
@Test
public void permanentChildrenNodesWatch() throws Exception {
// 第三个参数代表除了节点状态外,是否还缓存节点内容
PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
/*
* StartMode代表初始化方式:
* NORMAL: 异步初始化
* BUILD_INITIAL_CACHE: 同步初始化
* POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发INITIALIZED事件
*/
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点列表:");
childDataList.forEach(x -> System.out.println(x.getPath()));
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case INITIALIZED:
System.out.println("childrenCache初始化完成");
break;
case CHILD_ADDED:
// 需要注意的是: 即使是之前已经存在的子节点也会触发该监听因为会把该子节点加入childrenCache缓存中
System.out.println("增加子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("被修改的子节点的路径:" + event.getData().getPath());
System.out.println("修改后的数据:" + new String(event.getData().getData()));
break;
}
}
});
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
```

View File

@ -1,5 +1,22 @@
# Zookeeper常用Shell命令
<nav>
<a href="#一节点增删改查">一、节点增删改查</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#11-启动服务和连接服务">1.1 启动服务和连接服务</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#12-help命令">1.2 help命令</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#13-查看节点列表">1.3 查看节点列表</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#14-新增节点">1.4 新增节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#15-查看节点">1.5 查看节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#16-更新节点">1.6 更新节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#17-删除节点">1.7 删除节点</a><br/>
<a href="#二监听器">二、监听器</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-get-path-[watch]">2.1 get path [watch]</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-stat-path-[watch]">2.2 stat path [watch]</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-lsls2-path--[watch]">2.3 ls\ls2 path [watch]</a><br/>
<a href="#三-zookeeper-四字命令">三、 zookeeper 四字命令</a><br/>
</nav>
## 一、节点增删改查
### 1.1 启动服务和连接服务

View File

@ -1,5 +1,27 @@
# Zookeeper简介及核心概念
<nav>
<a href="#一Zookeeper简介">一、Zookeeper简介</a><br/>
<a href="#二Zookeeper设计目标">二、Zookeeper设计目标</a><br/>
<a href="#三核心概念">三、核心概念</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-集群角色">3.1 集群角色</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-会话">3.2 会话</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#33-数据节点">3.3 数据节点</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#34-节点信息">3.4 节点信息</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#35-Wather">3.5 Wather</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#36-ACL">3.6 ACL</a><br/>
<a href="#四ZAB协议">四、ZAB协议</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#41-ZAB协议与数据一致性">4.1 ZAB协议与数据一致性</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#42--ZAB协议的内容">4.2 ZAB协议的内容</a><br/>
<a href="#五Zookeeper的典型应用场景">五、Zookeeper的典型应用场景</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#51数据的发布订阅">5.1数据的发布/订阅</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#52-命名服务">5.2 命名服务</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#53-Master选举">5.3 Master选举</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#54-分布式锁">5.4 分布式锁</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#55-集群管理">5.5 集群管理</a><br/>
</nav>
## 一、Zookeeper简介
Zookeeper是一个开源的分布式协调服务由雅虎创建是Google Chubby的开源实现。Zookeeper可以用于实现分布式系统中常见的发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等功能。Zookeeeper可以保证如下的分布式一致性特性
@ -20,13 +42,13 @@ Zookeeper致力于为那些高吞吐的大型分布式系统提供一个高性
Zookeeper通过树形结构来存储数据它由一系列被称为ZNode的数据节点组成类似于常见的文件系统。不过和常见的文件系统不同Zookeeper将数据全量存储在内存中以此来实现高吞吐减少访问延迟。
![zookeeper-zknamespace](D:\BigData-Notes\pictures\zookeeper-zknamespace.jpg)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-zknamespace.jpg"/> </div>
### 2.2 目标二:构建集群
可以由一组Zookeeper服务构成Zookeeper服务集群集群中每台机器都会单独在内存中维护自身的状态并且每台机器之间都保持着通讯只要集群中有半数机器能够正常工作那么整个集群就可以正常提供服务。
![zookeeper-zkservice](D:\BigData-Notes\pictures\zookeeper-zkservice.jpg)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-zkservice.jpg"/> </div>
### 2.3 目标三:顺序访问
@ -105,7 +127,7 @@ ZAB协议是Zookeeper专门设计的一种支持崩溃恢复的原子广播协
Zookeeper使用一个单一的主进程来接收并处理客户端的所有事务请求并采用原子广播协议将数据的状态变更以事务Proposal的形式广播到所有的副本进程上去。如下图
![zookeeper-zkcomponents](D:\BigData-Notes\pictures\zookeeper-zkcomponents.jpg)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-zkcomponents.jpg"/> </div>
具体流程如下:
@ -125,7 +147,7 @@ ZAB协议的消息广播过程使用的是原子广播协议。在整个消息
Leader服务会为每一个Follower服务器分配一个单独的队列然后将事务Proposal依次放入队列中并根据FIFO(先进先出)的策略进行消息发送。Follower服务在接收到Proposal后会将其以事务日志的形式写入本地磁盘中并在写入成功后反馈给Leader一个Ack响应。当Leader接收到超过半数Follower的Ack响应后就会广播一个Commit消息给所有的Follower以通知其进行事务提交之后Leader自身也会完成对事务的提交。而每一个Follower则在接收到Commit消息后完成事务的提交。
![zookeeper-brocast](D:\BigData-Notes\pictures\zookeeper-brocast.jpg)
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-brocast.jpg"/> </div>

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB