diff --git a/code/Zookeeper/curator/pom.xml b/code/Zookeeper/curator/pom.xml index bfcd36a..e435f6a 100644 --- a/code/Zookeeper/curator/pom.xml +++ b/code/Zookeeper/curator/pom.xml @@ -7,6 +7,42 @@ com.heibaiying curator 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + org.apache.curator + curator-framework + 4.0.0 + + + org.apache.curator + curator-recipes + 4.0.0 + + + org.apache.zookeeper + zookeeper + 3.4.13 + + + + junit + junit + 4.12 + + + \ No newline at end of file diff --git a/code/Zookeeper/curator/src/main/java/com/heibaiying/AclOperation.java b/code/Zookeeper/curator/src/main/java/com/heibaiying/AclOperation.java new file mode 100644 index 0000000..d18a476 --- /dev/null +++ b/code/Zookeeper/curator/src/main/java/com/heibaiying/AclOperation.java @@ -0,0 +1,100 @@ +package com.heibaiying; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * @author : heibaiying + * @description : 使用curator操作Zookeeper ACL + */ +public class AclOperation { + + + private CuratorFramework client = null; + private static final String zkServerPath = "192.168.0.226:2181"; + private static final String nodePath = "/hadoop/hdfs"; + + + @Before + public void prepare() { + RetryPolicy retryPolicy = new RetryNTimes(3, 5000); + client = CuratorFrameworkFactory.builder() + .authorization("digest", "heibai:123456".getBytes()) + .connectString(zkServerPath) + .sessionTimeoutMs(10000).retryPolicy(retryPolicy) + .namespace("workspace").build(); + client.start(); + } + + + /** + * 新建节点并赋予权限 + */ + @Test + public void createNodesWithAcl() throws Exception { + List aclList = new ArrayList<>(); + // 对密码进行加密 + String digest1 = DigestAuthenticationProvider.generateDigest("heibai:123456"); + String digest2 = DigestAuthenticationProvider.generateDigest("ying:123456"); + Id user01 = new Id("digest", digest1); + Id user02 = new Id("digest", digest2); + // 指定所有权限 + aclList.add(new ACL(Perms.ALL, user01)); + // 如果想要指定权限的组合,中间需要使用 | ,这里的|代表的是位运算中的 按位或 + aclList.add(new ACL(Perms.DELETE | Perms.CREATE, user02)); + + // 创建节点 + byte[] data = "abc".getBytes(); + client.create().creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .withACL(aclList, true) + .forPath(nodePath, data); + } + + + /** + * 给已有节点设置权限,注意这会删除所有原来节点上已有的权限设置 + */ + @Test + public void SetAcl() throws Exception { + String digest = DigestAuthenticationProvider.generateDigest("admin:admin"); + Id user = new Id("digest", digest); + client.setACL() + .withACL(Collections.singletonList(new ACL(Perms.READ | Perms.DELETE, user))) + .forPath(nodePath); + } + + + /** + * 获取权限 + */ + @Test + public void getAcl() throws Exception { + List aclList = client.getACL().forPath(nodePath); + ACL acl = aclList.get(0); + System.out.println(acl.getId().getId()+"是否有删读权限:" + (acl.getPerms() == (Perms.READ | Perms.DELETE))); + } + + + @After + public void destroy() { + if (client != null) { + client.close(); + } + } + +} diff --git a/code/Zookeeper/curator/src/main/java/com/heibaiying/BasicOperation.java b/code/Zookeeper/curator/src/main/java/com/heibaiying/BasicOperation.java new file mode 100644 index 0000000..7ad8d06 --- /dev/null +++ b/code/Zookeeper/curator/src/main/java/com/heibaiying/BasicOperation.java @@ -0,0 +1,216 @@ +package com.heibaiying; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.*; +import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +/** + * @author : heibaiying + * @description : curator客户端API基本使用 + */ +public class BasicOperation { + + + private CuratorFramework client = null; + private static final String zkServerPath = "192.168.0.226:2181"; + private static final String nodePath = "/hadoop/yarn"; + + + @Before + public void prepare() { + // 重试策略 + RetryPolicy retryPolicy = new RetryNTimes(3, 5000); + client = CuratorFrameworkFactory.builder() + .connectString(zkServerPath) + .sessionTimeoutMs(10000).retryPolicy(retryPolicy) + .namespace("workspace").build(); //指定命名空间后,client的所有路径操作都会以/workspace开头 + client.start(); + } + + + /** + * 获取当前zookeeper的状态 + */ + @Test + public void getStatus() { + CuratorFrameworkState state = client.getState(); + System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED)); + } + + + /** + * 创建节点(s) + */ + @Test + public void createNodes() throws Exception { + byte[] data = "abc".getBytes(); + client.create().creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) //节点类型 + .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath(nodePath, data); + } + + + /** + * 获取节点信息 + */ + @Test + public void getNode() throws Exception { + Stat stat = new Stat(); + byte[] data = client.getData().storingStatIn(stat).forPath(nodePath); + System.out.println("节点数据:" + new String(data)); + System.out.println("节点信息:" + stat.toString()); + } + + /** + * 获取该节点的所有子节点 + */ + @Test + public void getChildrenNodes() throws Exception { + List childNodes = client.getChildren().forPath("/hadoop"); + for (String s : childNodes) { + System.out.println(s); + } + } + + + /** + * 更新节点 + */ + @Test + public void updateNode() throws Exception { + byte[] newData = "defg".getBytes(); + client.setData().withVersion(0) // 传入版本号,如果版本号错误则拒绝更新操作,并抛出BadVersion异常 + .forPath(nodePath, newData); + } + + + /** + * 删除节点 + */ + @Test + public void deleteNodes() throws Exception { + client.delete() + .guaranteed() // 如果删除失败,那么在会继续执行,直到成功 + .deletingChildrenIfNeeded() // 如果有子节点,则递归删除 + .withVersion(0) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出BadVersion异常 + .forPath(nodePath); + } + + + /** + * 判断子节点是否存在 + */ + @Test + public void existNode() throws Exception { + // 如果节点存在则返回其状态信息如果不存在则为null + Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc"); + System.out.println("节点是否存在:" + !(stat == null)); + } + + + /** + * 使用usingWatcher注册的监听是一次性的,即监听只会触发一次,监听完毕后就销毁 + */ + @Test + public void DisposableWatch() throws Exception { + client.getData().usingWatcher(new CuratorWatcher() { + public void process(WatchedEvent event) { + System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType()); + } + }).forPath(nodePath); + Thread.sleep(1000 * 1000); //休眠以观察测试效果 + } + + + /** + * 注册永久监听 + */ + @Test + public void permanentWatch() throws Exception { + // 使用NodeCache包装节点,对其注册的监听作用于节点,且是永久性的 + NodeCache nodeCache = new NodeCache(client, nodePath); + // 通常设置为true, 代表创建nodeCache时,就去获取对应节点的值并缓存 + nodeCache.start(true); + nodeCache.getListenable().addListener(new NodeCacheListener() { + public void nodeChanged() { + ChildData currentData = nodeCache.getCurrentData(); + if (currentData != null) { + System.out.println("节点路径:" + currentData.getPath() + + "数据:" + new String(currentData.getData())); + } + } + }); + Thread.sleep(1000 * 1000); //休眠以观察测试效果 + } + + + /** + * 针对子节点注册监听 + */ + @Test + public void permanentChildrenNodesWatch() throws Exception { + + // 第三个参数代表除了节点状态外,是否还缓存节点内容 + PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true); + /* + * StartMode代表初始化方式: + * NORMAL: 异步初始化 + * BUILD_INITIAL_CACHE: 同步初始化 + * POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发事件 + */ + childrenCache.start(StartMode.POST_INITIALIZED_EVENT); + + List childDataList = childrenCache.getCurrentData(); + System.out.println("当前数据节点的子节点列表:"); + childDataList.forEach(x->System.out.println(x.getPath())); + + childrenCache.getListenable().addListener(new PathChildrenCacheListener() { + + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case INITIALIZED: + System.out.println("childrenCache初始化完成"); + break; + case CHILD_ADDED: + // 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入childrenCache缓存中 + System.out.println("增加子节点:" + event.getData().getPath()); + break; + case CHILD_REMOVED: + System.out.println("删除子节点:" + event.getData().getPath()); + break; + case CHILD_UPDATED: + System.out.println("被修改的子节点的路径:" + event.getData().getPath()); + System.out.println("修改后的数据:" + new String(event.getData().getData())); + break; + } + } + }); + + Thread.sleep(1000 * 1000); //休眠以观察测试效果 + } + + + @After + public void destroy() { + if (client != null) { + client.close(); + } + } + +}