摘要:可靠性一旦数据更新成功,将一直保持,直到新的更新。这是一种主动的分布式数据结构,能够在外部情况发生变化时候主动修改数据项状态的数据机构。如果监视节点状态发生变化,则跳转到第步,继续进行后续的操作,直到退出锁竞争。
题外话:从字面上来看,ZooKeeper表示动物园管理员,而Hadoop生态系统中,许多项目的Logo都采用了动物,比如Hadoop采用了大象的形象,所以可以ZooKeeper就是对这些动物进行一些管理工作的。
对于单机环境进程内的协调方法,我们一般通过线程锁来协调对共享数据的访问以保证状态的一致性。 但是分布式环境如何进行协调呢?于是,Google创造了Chubby,而ZooKeeper则是对于Chubby的一个开源实现。
ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。由于ZooKeeper的开源特性,后来我们的开发者在分布式锁的基础上,摸索了出了其他的使用方法:配置维护、组服务、分布式消息队列、分布式通知/协调等。它被设计为易于编程,使用文件系统目录树作为数据模型。
Zookeeper服务自身组成一个集群(2n+1个服务允许n个失效)。Zookeeper服务有两个角色,一个是leader,负责写服务和数据同步,剩下的是follower,提供读服务,leader失效后会在follower中重新选举新的leader。
保证
顺序一致性:按照客户端发送请求的顺序更新数据。
原子性:更新要么成功,要么失败,不会出现部分更新。
单一性 :无论客户端连接哪个server,都会看到同一个视图。
可靠性:一旦数据更新成功,将一直保持,直到新的更新。
及时性:客户端会在一个确定的时间内得到最新的数据。
1、Zookeeper数据模型Znode
Zookeeper表现为一个分层的文件系统目录树结构(不同于文件系统的是,节点可以有自己的数据,而文件系统中的目录节点只有子节点)。
Zookeeper Stat 结构 —— Zookeeper中的每个znode的stat都由下面的字段组成:
czxid - 引起这个znode创建的zxid
mzxid - znode最后更新的zxid
ctime - znode被创建的毫秒数(从1970年开始)
mtime - znode最后修改的毫秒数(从1970年开始)
version - znode数据变化号
cversion - znode子节点变化号
aversion - znode访问控制列表的变化号
ephemeralOwner - 如果是临时节点这个是znode拥有者的session id。如果不是临时节点则是0。
dataLength - znode的数据长度
numChildren - znode子节点数量
关于数据模型的理解,建议参考:http://www.cnblogs.com/wuxl36...
简单API——Zookeeper的设计目标的其中之一就是提供一个简单的程序接口。因此,它只支持这些操作:
create - 在树形结构的位置中创建节点
delete - 删除一个节点
exists - 测试节点在指定位置上是否存在
get data - 从节点上读取数据
set data - 往节点写入数据
get chilren - 检索节点的子节点列表
sync - 等待传输数据
ZooKeeper的应用场景(1)统一命名服务
分布式应用中,通常需要有一套完整的命名规则,既能够产生唯一的名称又便于人识别和记住,通常情况下用树形的名称结构是一个理想的选择,树形的名称结构是一个有层次的目录结构,既对人友好又不会重复。说到这里你可能想到了 JNDI,没错 Zookeeper 的 Name Service 与 JNDI 能够完成的功能是差不多的,它们都是将有层次的目录结构关联到一定资源上,但是 Zookeeper 的 Name Service 更加是广泛意义上的关联,也许你并不需要将名称关联到特定资源上,你可能只需要一个不会重复名称,就像数据库中产生一个唯一的数字主键一样。
Name Service 已经是 Zookeeper 内置的功能,你只要调用 Zookeeper 的 API 就能实现。如调用 create 接口就可以很容易创建一个目录节点。
案例:有一组服务器向客户端提供某种服务(例如:使用LVS技术构建的Web网站集群,就是由N台服务器组成的集群,为用户提供Web服务)。对于这种场景,我们的程序中一定有一份这组服务器的列表,每次客户端请求时候,都是从这份列表里读取这份服务器列表。那么这分列表显然不能存储在一台单节点的服务器上,否则这个节点挂掉了,整个集群都会发生故障,我们希望这份列表时高可用的。高可用的解决方案是:这份列表是分布式存储的,它是由存储这份列表的服务器共同管理的,如果存储列表里的某台服务器坏掉了,其他服务器马上可以替代坏掉的服务器,并且可以把坏掉的服务器从列表里删除掉,让故障服务器退出整个集群的运行,而这一切的操作又不会由故障的服务器来操作,而是集群里正常的服务器来完成。这是一种主动的分布式数据结构,能够在外部情况发生变化时候主动修改数据项状态的数据机构。
(2)分布式锁服务
共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。
具体步骤如下:
加锁: ZooKeeper 将按照如下方式实现加锁的操作:
1 ) ZooKeeper 调用 create ()方法来创建一个路径格式为“ _locknode_/lock- ”的节点,此节点类型为sequence (连续)和 ephemeral (临时)。也就是说,创建的节点为临时节点,并且所有的节点连续编号,即“ lock-i ”的格式。
2 )在创建的锁节点上调用 getChildren ()方法,来获取锁目录下的最小编号节点,并且不设置 watch 。
3 )步骤 2 中获取的节点恰好是步骤 1 中客户端创建的节点,那么此客户端获得此种类型的锁,然后退出操作。
4 )客户端在锁目录上调用 exists ()方法,并且设置 watch 来监视锁目录下比自己小一个的连续临时节点的状态。
5 )如果监视节点状态发生变化,则跳转到第 2 步,继续进行后续的操作,直到退出锁竞争。
解锁:
ZooKeeper 解锁操作非常简单,客户端只需要将加锁操作步骤 1 中创建的临时节点删除即可。
void getLock() throws KeeperException, InterruptedException{ Listlist = zk.getChildren(root, false); String[] nodes = list.toArray(new String[list.size()]); Arrays.sort(nodes); if(myZnode.equals(root+"/"+nodes[0])){ doAction(); } else{ waitForLock(nodes[0]); } } void waitForLock(String lower) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); if(stat != null){ mutex.wait(); } else{ getLock(); } }
(3)配置管理(数据发布与订阅)
在分布式系统里,我们会把一个服务应用分别部署到n台服务器上,这些服务器的配置文件是相同的,如果配置文件的配置选项发生变化,那么我们就得一个个去改这些配置文件,如果我们需要改的服务器比较少,这些操作还不是太麻烦,如果我们分布式的服务器特别多,那么更改配置选项就是一件麻烦而且危险的事情。这时我们可以将配置信息保存在 Zookeeper 的某个目录节点中,然后将所有需要修改的应用机器监控配置信息的状态,一旦配置信息发生变化,每台应用机器就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中。
(4)集群管理
Zookeeper 能够很容易的实现集群管理的功能,如有多台 Server 组成一个服务集群,那么必须要一个“总管”知道当前集群中每台机器的服务状态,一旦有机器不能提供服务,集群中其它集群必须知道,从而做出调整重新分配服务策略。同样当增加集群的服务能力时,就会增加一台或多台 Server,同样也必须让“总管”知道。
Zookeeper 不仅能够帮你维护当前的集群中机器的服务状态,而且能够帮你选出一个“总管”,让这个总管来管理集群,这就是 Zookeeper 的另一个功能 Leader Election。
它们的实现方式都是在 Zookeeper 上创建一个 EPHEMERAL 类型的目录节点,然后每个 Server 在它们创建目录节点的父目录节点上调用 getChildren(String path, boolean watch) 方法并设置 watch 为 true,由于是 EPHEMERAL 目录节点,当创建它的 Server 死去,这个目录节点也随之被删除,所以 Children 将会变化,这时 getChildren上的 Watch 将会被调用,所以其它 Server 就知道已经有某台 Server 死去了。新增 Server 也是同样的原理。
Zookeeper 如何实现 Leader Election,也就是选出一个 Master Server。和前面的一样每台 Server 创建一个 EPHEMERAL 目录节点,不同的是它还是一个 SEQUENTIAL 目录节点,所以它是个 EPHEMERAL_SEQUENTIAL 目录节点。之所以它是 EPHEMERAL_SEQUENTIAL 目录节点,是因为我们可以给每台 Server 编号,我们可以选择当前是最小编号的 Server 为 Master,假如这个最小编号的 Server 死去,由于是 EPHEMERAL 节点,死去的 Server 对应的节点也被删除,所以当前的节点列表中又出现一个最小编号的节点,我们就选择这个节点为当前 Master。这样就实现了动态选择 Master,避免了传统意义上单 Master 容易出现单点故障的问题。
PS:关于Master的选举,可以参考:http://www.cnblogs.com/sundde...。
注意:ZooKeeper所提供的服务主要是通过:数据结构+原语(一些关于该数据结构的一些操作)+watcher机制,三个部分来实现的
(5)、队列管理
Zookeeper 可以处理两种类型的队列:
当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。
A、同步队列 用 Zookeeper 实现的实现思路如下:
创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 / synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start。
void addQueue() throws KeeperException, InterruptedException{ zk.exists(root + "/start",true); zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { Listlist = zk.getChildren(root, false); if (list.size() < size) { mutex.wait(); } else { zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } } public void process(WatchedEvent event) { if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){ System.out.println("得到通知"); super.process(event); doAction(); } }
B、FIFO队列:
实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL 类型的子目录 /queue_i,这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证 FIFO。
boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { ListZooKeeper集群模式搭建list = zk.getChildren(root, true); if (list.size() == 0) { mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); if(tempValue < min) min = tempValue; } byte[] b = zk.getData(root + "/element" + min,false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } }
注意:ZooKeeper服务器集群规模不小于3个节点,要求各服务器之间系统时间要保持一致;
1、下载解压安装后,修改环境变量:vim /etc/profile
增加一行:export ZOOKEEPER_HOME=/usr/local/zookeeper
修改PATH:export PATH=.:$HADOOP_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
使配置生效:source /etc/profile
2、进入zookeeper的conf目录下,修改文件名:mv zoo_sample.cfg zoo.cfg
编辑zoo.cfg:vim zoo.cfg
修改dataDir=/usr/local/zookeeper/data
新增
server.0=hadoop-master:2888:3888
server.1=hadoop-slave1:2888:3888
server.2=hadoop-slave2:2888:3888
3、创建data文件夹,并创建myid文件:
新建data文件夹:mkdir /usr/local/zookeeper/data
新建myid文件:vim myid,并设置第一台server为0。
4、复制zookeeper目录至其余两台服务器中:
scp /usr/local/zookeeper hadoop-slave1:/usr/local/
scp /usr/local/zookeeper hadoop-slave2:/usr/local/
5、复制环境变量配置文件至其余两台服务器中:
scp /etc/profile hadoop-slave1:/etc
scp /etc/profile hadoop-slave2:/etc
6、在其余两台服务器中修改myid文件:设置为1和2;
7、启动ZooKeeper,分别在三个节点中执行命令:zkServer.sh start
8、检验ZooKeeper集群节点角色状态,分别在三个节点中执行命令:zkServer.sh status (可以看出哪个节点是leader,follower,observer等)
ZooKeeper中包含以下角色:
领导者(leader),负责进行投票的发起和决议,更新系统状态;
学习者(learner),包括跟随者(follower)和观察者(observer),follower用于接受客户端请求并向客户端返回结果,在选主过程中参与投票;observer可以接受客户端连接,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度;
9、ZooKeeper简单测试
搭建好集群环境后,就可以进行简单的读写一致性测试了,这里我们通过进入zookeeper的bin目录下的zkCli.sh来完成下面的操作:
(1)在其中一个节点192.168.80.100上执行一个写操作:create /MyTest test
(2)在其他两个节点上执行读操作:get /MyTest
TIP:可以在一个节点中通过zkCli.sh -server hadoop-slave1:2181来远程登录
(3)在其中一个节点192.168.80.101上执行一个修改操作:set /MyTest new-test ,在其他两个节点上执行读操作查看数据是否一致
[zkshell: 0] help ZooKeeper host:port cmd args get path [watch] ls path [watch] set path data [version] delquota [-n|-b] path quit printwatches on|off createpath data acl stat path [watch] listquota path history setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path编程
APIDOC: https://zookeeper.apache.org/...
org.apache.zookeeper zookeeper 3.4.6
Zookeeper主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理
客户端要连接 Zookeeper 服务器可以通过创建 org.apache.zookeeper.ZooKeeper 的一个实例对象,然后调用这个类提供的接口来和服务器交互。
前面说了 ZooKeeper 主要是用来维护和监控一个目录节点树中存储的数据的状态,所有我们能够操作 ZooKeeper 的也和操作目录节点树大体一样,如创建一个目录节点,给某个目录节点设置数据,获取某个目录节点的所有子目录节点,给某个目录节点设置权限和监控这个目录节点的状态变化。
常见方法:
String create(String path, byte[] data, List
CreateMode 标识有四种形式的目录节点:
PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;
PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;
EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;
EPHEMERAL_SEQUENTIAL:临时自动编号节点
Stat exists(String path, boolean watch) 判断某个 path 是否存在,并设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper 实例时指定的 watcher
Stat exists(String path,Watcher watcher) 重载方法,这里给某个目录节点设置特定的 watcher,Watcher 在 ZooKeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应
void delete(String path, int version) 删除 path 对应的目录节点,version 为 -1 可以匹配任何版本,也就删除了这个目录节点所有数据
List
Stat setData(String path, byte[] data, int version) 给 path 设置数据,可以指定这个数据的版本号,如果 version 为 -1 怎可以匹配任何版本
byte[] getData(String path, boolean watch, Stat stat) 获取这个 path 对应的目录节点存储的数据,数据的版本等信息可以通过 stat 来指定,同时还可以设置是否监控这个目录节点数据的状态
void addAuthInfo(String scheme, byte[] auth) 客户端将自己的授权信息提交给服务器,服务器将根据这个授权信息验证客户端的访问权限。
Stat setACL(String path,List
Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 几种,而 id 标识了访问目录节点的身份列表,默认情况下有以下两种:ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分别表示任何人都可以访问和创建者拥有访问权限。
List
// 创建一个与服务器的连接 ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, new Watcher() { // 监控所有被触发的事件 public void process(WatchedEvent event) { System.out.println("已经触发了" + event.getType() + "事件!"); } }); // 创建一个目录节点 zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 创建一个子目录节点 zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); System.out.println(new String(zk.getData("/testRootPath",false,null))); // 取出子目录节点列表 System.out.println(zk.getChildren("/testRootPath",true)); // 修改子目录节点数据 zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1); System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]"); // 创建另外一个子目录节点 zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null))); // 删除子目录节点 zk.delete("/testRootPath/testChildPathTwo",-1); zk.delete("/testRootPath/testChildPathOne",-1); // 删除父目录节点 zk.delete("/testRootPath",-1); // 关闭连接 zk.close();
官网例子请看:http://zookeeper.majunwei.com...
其他例子:
http://www.uml.org.cn/zjjs/20...
http://www.uml.org.cn/zjjs/20...
http://zookeeper.majunwei.com...
参考:官方文档:https://zookeeper.apache.org/...
小马过河翻译社,Zookeeper文档中文版:http://zookeeper.majunwei.com...
周旭龙,ZooKeeper环境搭建:http://www.cnblogs.com/edison...
Zookeeper 的学习与运用:http://www.oschina.net/questi...
邬兴亮,Zookeeper随笔系列:http://www.cnblogs.com/wuxl36...
ggjucheng,Zookeeper Api(java)入门与应用:http://www.cnblogs.com/ggjuch...
其他推荐:
http://cailin.iteye.com/blog/...
http://www.uml.org.cn/wenzhan...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/67377.html
摘要:相关概念协议高级消息队列协议是一个标准开放的应用层的消息中间件协议。可以用命令与不同,不是线程安全的。手动提交执行相关逻辑提交注意点将写成单例模式,有助于减少端占用的资源。自身是线程安全的类,只要封装得当就能最恰当的发挥好的作用。 本文使用的Kafka版本0.11 先思考些问题: 我想分析一下用户行为(pageviews),以便我能设计出更好的广告位 我想对用户的搜索关键词进行统计,...
摘要:在的的首次会议庆祝了其新功能版本的发布。可以以这三种方式暴露出来内部外部和负载均衡。比如,可能会通过一个弹性负载均衡器接收流量,或者通过谷歌的负载均衡器接收。这个功能令第三方负载均衡器整合到。负起了接管发现任务和微服务负载均衡器的重任。 随着容器逐渐受到企业的注意,焦点慢慢被转移到了容器编排工具上。复杂的工作负载在生产过程中需要成熟地被调度,编排,弹性扩容和管理工具。有了Docker,...
摘要:同时集成了机器学习类库。基于计算框架,将的分布式计算应用到机器学习领域。提供了一个简单的声明方法指定机器学习任务,并且动态地选择最优的学习算法。宣称其性能是的多倍。 介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率更高,mapred...
摘要:作为面试官,我是如何甄别应聘者的包装程度语言和等其他语言的对比分析和主从复制的原理详解和持久化的原理是什么面试中经常被问到的持久化与恢复实现故障恢复自动化详解哨兵技术查漏补缺最易错过的技术要点大扫盲意外宕机不难解决,但你真的懂数据恢复吗每秒 作为面试官,我是如何甄别应聘者的包装程度Go语言和Java、python等其他语言的对比分析 Redis和MySQL Redis:主从复制的原理详...
阅读 933·2023-04-26 01:47
阅读 1589·2021-11-18 13:19
阅读 1946·2019-08-30 15:44
阅读 596·2019-08-30 15:44
阅读 2261·2019-08-30 15:44
阅读 1207·2019-08-30 14:06
阅读 1397·2019-08-30 12:59
阅读 1877·2019-08-29 12:49