zookeeper cuator客户端

This commit is contained in:
luoxiang 2019-05-25 17:43:03 +08:00
parent c138b52713
commit 92ccbfcbee
3 changed files with 352 additions and 0 deletions

View File

@ -7,6 +7,42 @@
<groupId>com.heibaiying</groupId>
<artifactId>curator</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
<!--单元测试相关依赖-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,100 @@
package com.heibaiying;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* @author : heibaiying
* @description : 使用curator操作Zookeeper ACL
*/
public class AclOperation {
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.0.226:2181";
private static final String nodePath = "/hadoop/hdfs";
@Before
public void prepare() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.authorization("digest", "heibai:123456".getBytes())
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
/**
* 新建节点并赋予权限
*/
@Test
public void createNodesWithAcl() throws Exception {
List<ACL> aclList = new ArrayList<>();
// 对密码进行加密
String digest1 = DigestAuthenticationProvider.generateDigest("heibai:123456");
String digest2 = DigestAuthenticationProvider.generateDigest("ying:123456");
Id user01 = new Id("digest", digest1);
Id user02 = new Id("digest", digest2);
// 指定所有权限
aclList.add(new ACL(Perms.ALL, user01));
// 如果想要指定权限的组合中间需要使用 | ,这里的|代表的是位运算中的 按位或
aclList.add(new ACL(Perms.DELETE | Perms.CREATE, user02));
// 创建节点
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(aclList, true)
.forPath(nodePath, data);
}
/**
* 给已有节点设置权限,注意这会删除所有原来节点上已有的权限设置
*/
@Test
public void SetAcl() throws Exception {
String digest = DigestAuthenticationProvider.generateDigest("admin:admin");
Id user = new Id("digest", digest);
client.setACL()
.withACL(Collections.singletonList(new ACL(Perms.READ | Perms.DELETE, user)))
.forPath(nodePath);
}
/**
* 获取权限
*/
@Test
public void getAcl() throws Exception {
List<ACL> aclList = client.getACL().forPath(nodePath);
ACL acl = aclList.get(0);
System.out.println(acl.getId().getId()+"是否有删读权限:" + (acl.getPerms() == (Perms.READ | Perms.DELETE)));
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}

View File

@ -0,0 +1,216 @@
package com.heibaiying;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
/**
* @author : heibaiying
* @description : curator客户端API基本使用
*/
public class BasicOperation {
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.0.226:2181";
private static final String nodePath = "/hadoop/yarn";
@Before
public void prepare() {
// 重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build(); //指定命名空间后client的所有路径操作都会以/workspace开头
client.start();
}
/**
* 获取当前zookeeper的状态
*/
@Test
public void getStatus() {
CuratorFrameworkState state = client.getState();
System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
}
/**
* 创建节点(s)
*/
@Test
public void createNodes() throws Exception {
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
}
/**
* 获取节点信息
*/
@Test
public void getNode() throws Exception {
Stat stat = new Stat();
byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点数据:" + new String(data));
System.out.println("节点信息:" + stat.toString());
}
/**
* 获取该节点的所有子节点
*/
@Test
public void getChildrenNodes() throws Exception {
List<String> childNodes = client.getChildren().forPath("/hadoop");
for (String s : childNodes) {
System.out.println(s);
}
}
/**
* 更新节点
*/
@Test
public void updateNode() throws Exception {
byte[] newData = "defg".getBytes();
client.setData().withVersion(0) // 传入版本号如果版本号错误则拒绝更新操作,并抛出BadVersion异常
.forPath(nodePath, newData);
}
/**
* 删除节点
*/
@Test
public void deleteNodes() throws Exception {
client.delete()
.guaranteed() // 如果删除失败那么在会继续执行直到成功
.deletingChildrenIfNeeded() // 如果有子节点则递归删除
.withVersion(0) // 传入版本号如果版本号错误则拒绝删除操作,并抛出BadVersion异常
.forPath(nodePath);
}
/**
* 判断子节点是否存在
*/
@Test
public void existNode() throws Exception {
// 如果节点存在则返回其状态信息如果不存在则为null
Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
System.out.println("节点是否存在:" + !(stat == null));
}
/**
* 使用usingWatcher注册的监听是一次性的,即监听只会触发一次监听完毕后就销毁
*/
@Test
public void DisposableWatch() throws Exception {
client.getData().usingWatcher(new CuratorWatcher() {
public void process(WatchedEvent event) {
System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());
}
}).forPath(nodePath);
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
/**
* 注册永久监听
*/
@Test
public void permanentWatch() throws Exception {
// 使用NodeCache包装节点对其注册的监听作用于节点且是永久性的
NodeCache nodeCache = new NodeCache(client, nodePath);
// 通常设置为true, 代表创建nodeCache时,就去获取对应节点的值并缓存
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
System.out.println("节点路径:" + currentData.getPath() +
"数据:" + new String(currentData.getData()));
}
}
});
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
/**
* 针对子节点注册监听
*/
@Test
public void permanentChildrenNodesWatch() throws Exception {
// 第三个参数代表除了节点状态外是否还缓存节点内容
PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
/*
* StartMode代表初始化方式:
* NORMAL: 异步初始化
* BUILD_INITIAL_CACHE: 同步初始化
* POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发事件
*/
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点列表:");
childDataList.forEach(x->System.out.println(x.getPath()));
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case INITIALIZED:
System.out.println("childrenCache初始化完成");
break;
case CHILD_ADDED:
// 需要注意的是: 即使是之前已经存在的子节点也会触发该监听因为会把该子节点加入childrenCache缓存中
System.out.println("增加子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("被修改的子节点的路径:" + event.getData().getPath());
System.out.println("修改后的数据:" + new String(event.getData().getData()));
break;
}
}
});
Thread.sleep(1000 * 1000); //休眠以观察测试效果
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}