资讯专栏INFORMATION COLUMN

Zookeeper 客户端 Api 的基本使用

fizz / 1016人阅读

零 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

Zookeeper Server 版本 : 3.5.4-beta

Zookeeper Client 版本 : 3.5.4-beta

Curator 版本 : 4.2.0

一 Zookeeper Client

Zookeeper Client 是 Zookeeper 的经典原生客户端。使用之前需要在 Maven 中导入依赖:


    org.apache.zookeeper
    zookeeper
    3.5.4-beta

代码:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ClientTest {

    public static void main(String[] args) {

        /**
         * 创建一个 Zookeeper 的实例
         * 此处为一个集群,Zookeeper 的 ip 之间用逗号隔开
         *
         * 参数解释:
         * param 1 - Zookeeper 的实例 ip ,此处是一个集群,所以配置了多个 ip,用逗号隔开
         * param 2 - session 过期时间,单位秒 (1000)
         * param 3 - 监视者,用于获取监控事件 (MyWatch)
         */
        ZooKeeper zooKeeper = null;
        try {
            Watcher createZkWatch = new MyWatch();
            zooKeeper = new ZooKeeper("localhost:2101,localhost:2102,localhost:2103",
                    1000,createZkWatch);
        } catch (IOException e) {
            e.printStackTrace();
        }

        /**
         * 值得注意的是,Zookeeper 对象去连接中间件实例是异步的
         * 所以此处需要做一个死循环等待它连接完毕
         * 更加优雅的做法是使用 CownDownLatch 去做,但是 while 比较简单
         */
        while(zooKeeper.getState() == ZooKeeper.States.CONNECTING){
            //返回 zookeeper 的状态
            System.out.println(zooKeeper.getState());

            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //如果连接不出错的话此处状态应该为 CONNECTED
        if(zooKeeper.getState() != ZooKeeper.States.CONNECTED)
            return;


        /**
         * 创建 ZooKeeper 节点
         * 参数解释:
         * param 1 - znode 名称 (/zoo)
         * param 2 - 节点数据 (my first data)
         * param 3 - 设置权限 (OPEN_ACL_UNSAFE)
         * param 4 - znode 类型 (PERSISTENT)
         *
         *
         * znode 类型有四种:
         * PERSISTENT - 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
         * PERSISTENT_SEQUENTIAL - 持久化,并带有序列号
         * EPHEMERAL - 临时目录节点,客户端与zookeeper断开连接后,该节点被删除
         * EPHEMERAL_SEQUENTIAL - 临时,并带有序列号
         */
        try {
            String s = zooKeeper.create("/zoo", "my first data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("创建节点:" + s);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 创建一个二级节点,参数同上
         * 需要注意的是,必须要有一级节点才能有二级节点,不然会报错
         */
        try {
            String s = zooKeeper.create("/zoo/zoo_1", "my first data_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("创建二级节点:" + s);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 查询 ZooKeeper 节点的数据
         * 参数解释:
         * param 1 - znode 名称 (/zoo)
         * param 2 - 监视者,用于获取监控事件 (MyWatch)
         * param 3 - Zookeeper 实例信息和数据信息 (stat)
         *
         * 注意如果后续需要修改该节点的值,可以在此处记录节点版本 version (非必要操作)
         */
        Integer zooVersion = null;
        try {
            MyWatch getDataWatch = new MyWatch();
            Stat stat = new Stat();
            byte[] data = zooKeeper.getData("/zoo",getDataWatch,stat);
            System.out.println("查询节点数据:" + new String(data));

            //从 stat 中可以获取很多 Zookeeper 实例的信息
            System.out.println("查询节点数据 czxid:" + stat.getCzxid()); //zxid
            zooVersion = stat.getVersion(); //此处获取 /zoo 节点的版本号
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 修改 ZooKeeper 节点的数据
         * 参数解释:
         * param 1 - znode 名称 (/zoo)
         * param 2 - 节点新数据 (my first data change)
         * param 3 - 该节点的版本
         *
         * 在成功修改了节点的数据之后,版本号会自动加一
         * 如果此时不知道节点的版本,也可以输入 -1,会默认取最新的节点版本去修改
         */
        try {
            Stat stat = zooKeeper.setData("/zoo", "my first data change".getBytes(), zooVersion); // zooVersion = -1
            System.out.println("修改节点数据 czxid:" + stat.getCzxid());
            System.out.println("修改节点数据 version:" + stat.getVersion());
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 查看 ZooKeeper 节点是否存在
         * 参数解释:
         * param 1 - znode 名称 (/zoo)
         * param 2 - 监视者,用于获取监控事件 (MyWatch)
         *
         * 如果不存在,返回的 stat 为 null
         */
        try {
            Stat stat = zooKeeper.exists("/zoo_not_exist", new MyWatch());
            System.out.println("查看节点是否存在 stat:" + stat);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        /**
         * 删除 ZooKeeper 节点
         * 参数解释:
         * param 1 - znode 名称 (/zoo)
         * param 2 - 该节点的版本
         *
         * 版本号如果不清楚的话可以填入 -1,和上述同理
         * 值得注意的是,如果一个节点下属存在子节点,那么它不能被删除
         */
        try {
            zooKeeper.delete("/zoo", -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    private static class MyWatch implements Watcher{

        public void process(WatchedEvent watchedEvent) {
            System.out.println(watchedEvent);
        }
    }
}
二 Curator

Curator 是 Netfix 开发的 Zookeeper Client,使用起来更方便,功能更加强大,目前应用更加广泛。使用之前需要在 Maven 中导入依赖:


    org.apache.curator
    curator-recipes
    4.2.0


    org.apache.curator
    curator-framework
    4.2.0

代码:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.List;

public class CuratorTest {

    public static void main(String[] args) {

        /**
         * 创建客户端
         *
         * RetryPolicy 接口是重试策略
         */
        /**
         * 指定客户端的重连策略
         *
         * RetryOneTime(int ms)
         * 休眠一定毫秒数之后重新连接一次
         *
         * RetryForever(int ms)
         * 和第一种策略的差别是会不断尝试重连
         *
         * RetryNTimes(int times,int ms)
         * 和第一种策略的差别是,第一个参数指定重连次数,第二个参数指定休眠间隔
         *
         * RetryUntilElapsed(int max_sum_ms,int ms)
         * 第一个参数指定最大休眠时间,第二个参数指定休眠间隔,如果休眠时间超出了就不会继续重连
         *
         * ExponentialBackoffRetry(int ms,int,int max_ms)
         * 第一个参数代表最初的重连休眠时间,第二个参数代表最大重连次数,第三个参数代表最大重连休眠时间
         * 该策略下重连的休眠时间会随着重连次数的增加而增加,从最初休眠时间一直增加到最大休眠时间
         * 最大重连次数必须小于等于 29,超过的情况下会被自动修改成 29
         *
         * [其它策略不一一列举]
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(100,3,1000);

        /**
         * 采用 buider 模式创建客户端
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
                                        //Zookeeper 的地址
                                        .connectString("localhost:2101,localhost:2102,localhost:2103")
                                        //session 的过期时间(毫秒)
                                        .sessionTimeoutMs(5000)
                                        //连接的超时时间(毫秒)
                                        .connectionTimeoutMs(5000)
                                        //拒绝策略
                                        .retryPolicy(retryPolicy)
                                        //设置该客户端能够操作的目录权限,不设置的话默认可以操作全部
                                        //比如此处设置为 zoo,即为该客户端对象操作的节点前面默认会添加 /zoo
                                        .namespace("zoo")
                                        //完成创建
                                        .build();
        //启动客户端
        client.start();


        /**
         * 创建节点
         */
        try {
            String createReturn = client.create()
                                    //节点类型
                                    //PERSISTENT - 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在
                                    //PERSISTENT_SEQUENTIAL - 持久化,并带有序列号
                                    //EPHEMERAL - 临时目录节点,客户端与zookeeper断开连接后,该节点被删除
                                    //EPHEMERAL_SEQUENTIAL - 临时,并带有序列号
                                    .withMode(CreateMode.PERSISTENT)
                                    //由于 namespace 设置为 zoo,所以此处相当于创建 /zoo/zoo_1 节点
                                    .forPath("/zoo_1", "my first data zoo_1".getBytes());
            System.out.println("创建节点:" + createReturn);
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 查询节点
         */
        try {
            Stat stat = client.checkExists()
                    //查询 /zoo/zoo_1 节点
                    .forPath("/zoo_1");
            //如果不存在,stat 为 null
            System.out.println("查询节点:" + stat);
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 删除节点
         */
        try {
            client.delete()
                    //如果该节点下有子节点,会抛出异常且删除失败
                    .forPath("/zoo_1");
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 查询节点的值
         */
        try {
            Stat stat = new Stat();
            byte[] value = client.getData()
                                    //获取节点的 stat
                                    .storingStatIn(stat)
                                    //查询 /zoo/zoo_1 节点
                                    .forPath("/zoo_1");
            System.out.println("查询节点的值:" + new String(value));
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 更新节点的值
         */
        try {
            Stat stat = client.setData()
                                //设置版本值,此选项非必填
                                .withVersion(10086)
                                .forPath("/zoo_1", "zoo_1 new data".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }

        /**
         * 获取节点的子节点
         */
        try {
            //获取所有子节点的节点名称
            List nodes = client.getChildren()
                                    .forPath("/zoo_1");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
三 使用 Curator 实现分布式锁

Zookeeper 中的分布式锁实现原理很简单,就是多个线程一起去创建同一个节点,谁创建成功锁就归谁;使用完之后删除该节点,其它节点再进行一次争抢。Curator 中有一个写好的重入锁 InterProcessMutex,简单封装即可使用:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Zookeeper 分布式锁实现
 */
public class ZkLock implements Lock{

    private InterProcessMutex lock;

    /**
     * 让使用者方便运用的构造方法
     */
    public ZkLock(String zkAddrs){
        this(zkAddrs,
            "/lock_node",
            "lock_base",
            2000,
            new ExponentialBackoffRetry(1000, 10));
    }

    /**
     * 核心构造方法,根据传入的参数去构造 lock 对象
     * @param zkAddrs  Zookeeper 的服务地址
     * @param lockNode 各个线程要去争抢创建的 Znode,也就是客户端有使用权限的 namespace
     * @param baseNode lockNode 的上级 Znode
     * @param sessionOutTimeMs 过期时间
     * @param policy 重连策略
     */
    public ZkLock(String zkAddrs,String lockNode,String baseNode,int sessionOutTimeMs,RetryPolicy policy){

        //有效性验证
        if(Objects.isNull(zkAddrs)
                || zkAddrs.trim().equals("")
                || Objects.isNull(lockNode)
                || lockNode.trim().equals("")
                || Objects.isNull(policy))
            throw new RuntimeException();


        //通过工厂创建连接
        CuratorFrameworkFactory.Builder cfBuilder = CuratorFrameworkFactory.builder()
                                                                .connectString(zkAddrs)
                                                                .sessionTimeoutMs(sessionOutTimeMs)
                                                                .retryPolicy(policy);
        if(baseNode != null && !baseNode.trim().equals(""))
            cfBuilder.namespace(baseNode);
        CuratorFramework cf = cfBuilder.build();

        //开启连接
        cf.start();

        //InterProcessMutex 是 Crator 里自带的一个已经实现好的重入锁
        //只要对其进行简单封装即可使用
        lock = new InterProcessMutex(cf,lockNode);
    }

    /**
     * 上锁方法,死循环调用 tryLock() 去上锁
     */
    @Override
    public void lock() {
        while (!tryLock())
            Thread.yield();
    }

    /**
     * 尝试获取锁,如果没能获取到会超时后报错
     */
    @Override
    public boolean tryLock() {
        try {
            lock.acquire();
        } catch (Exception e) {
            return Boolean.FALSE;
        }
        return Boolean.TRUE;
    }

    /**
     * 尝试获取锁,如果指定时间内获取不到就返回 false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        try {
            return lock.acquire(time,unit);
        } catch (Exception e) {
            return Boolean.FALSE;
        }
    }

    /**
     * 释放锁,如果报错就会递归去释放
     */
    @Override
    public void unlock() {
        try {
            lock.release();
        } catch (Exception e) {
            unlock();
        }
    }

    //忽略
    @Override
    public Condition newCondition() {
        throw new RuntimeException();
    }

    //忽略
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock();
    }


    //测试
    public static void main(String[] args) throws Exception {

        //创建一个要被操作的对象
        AtomicInteger count = new AtomicInteger(30);

        //创建一个线程池
        Executor executor = Executors.newFixedThreadPool(10);

        //创建所对象
        Lock lock = new ZkLock("localhost:2101,localhost:2102,localhost:2103");

        //for 循环,把任务丢进线程池里
        for(int i = 0; i < 30; i++){

            executor.execute(()->{
                try {
                    //加锁
                    lock.lock();

                    //此处开启业务逻辑
                    //demo 中简单模拟,将 count 对象减一
                    int a = count.decrementAndGet();
                    System.out.println(a);

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        //释放锁
                        lock.unlock();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74642.html

相关文章

  • 大数据入门指南(GitHub开源项目)

    摘要:项目地址前言大数据技术栈思维导图大数据常用软件安装指南一分布式文件存储系统分布式计算框架集群资源管理器单机伪集群环境搭建集群环境搭建常用命令的使用基于搭建高可用集群二简介及核心概念环境下的安装部署和命令行的基本使用常用操作分区表和分桶表视图 项目GitHub地址:https://github.com/heibaiying... 前 言 大数据技术栈思维导图 大数据常用软件安装指...

    guyan0319 评论0 收藏0
  • 2018年第16周-ZooKeeper基本概念(配搭建过程和Master-Workers例子)

    摘要:有可能是宕机或负荷严重的情况导致的。为分布式系统提供了协调功能和控制冲突。 背景 随着计算机的硬件和操作系统两者相辅相成地发展,从早期的ENIAC计算机到现在的x86的计算机,从以前的单一控制终端(Single Operator, Single Console, SOSC)的操作系统到现在百花争鸣的操作系统(如MacOS、Windows、Linux等),现代的操作系统发展还有一个最重要...

    wemall 评论0 收藏0
  • Zookeeper学习系列【一】 教会你Zookeeper一些基础概念

    摘要:具有不可分割性即原语的执行必须是连续的,在执行过程中不允许被中断。提供服务主要就是通过数据结构原语集机制达到的。子节点的版本号数据节点版本号版本号创建该节点的会话的。后位则为递增序列。 前言 最近加入了部门的技术兴趣小组,被分配了Zookeeper的研究任务。在研究过程当中,发现Zookeeper由于其开源的特性和其卓越的性能特点,在业界使用广泛,有很多的应用场景,而这些不同的应用场景...

    DevWiki 评论0 收藏0
  • ZooKeeper 概念与基础

    摘要:由于分布式系统和应用可以提供更强的计算能力,还能更好地容灾和扩展,所以逐渐受到青睐。基础由若干条指令组成,用于完成特定功能的过程称为原语。 信息飞速膨胀,很多应用无法依赖单个服务器处理庞大的数据量。由于分布式系统和应用可以提供更强的计算能力,还能更好地容灾和扩展,所以逐渐受到青睐。 在开发分布式应用时,通常需要花费大量时间和精力来处理异构系统中的协作通信问题。 什么是 ZooKeepe...

    endless_road 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<