资讯专栏INFORMATION COLUMN

PHP和zookeeper结合实践

Jinkey / 1022人阅读

摘要:简单介绍是开发和维护开源服务器的服务,它能够实现高度可靠的分布式协调。同步地获取与节点关联的。此值可能在服务器重新连接后发生更改。

Zookeeper 简单介绍
Apache Zookeeper是开发和维护开源服务器的服务,它能够实现高度可靠的分布式协调。
安装Zookeeper(无需安装)
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar zxvf zookeeper-3.4.10.tar.gz
安装Zookeeper C扩展支持
cd zookeeper-3.4.8/src/c
./configure --prefix=/usr/
make
make install
安装php的zookeeper扩展
wget https://pecl.php.net/get/zookeeper-0.6.2.tgz
tar zxvf zookeeper-0.6.2.tgz
cd zookeeper-0.6.2
phpize
./configure --with-libzookeeper-dir=/usr/
$ make
$ make install
启动zookeeper server
# {zookeeperserver}代表你zookeeper解压的目录

mkdir -p /project/zookeeper/demo/data

#zoo.conf 是默认启动所加载的配置文件
cp /{zookeeperserver}/conf/zoo_sample.cfg /{zookeeperserver}/conf/zoo.cfg

/{zookeeperserver}/bin/zkServer start
zookeeper client 测试
/{zookeeperserver}/bin/zkCli.sh -server 127.0.0.1:2181

# [zk: 127.0.0.1:2181(CONNECTED) x] 客户端连接信息shell
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /
[zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 4] create /test qkl001
Created /test
[zk: 127.0.0.1:2181(CONNECTED) 5] create /zgq zgq002
Created /zgq
[zk: 127.0.0.1:2181(CONNECTED) 6] ls / 
[zookeeper, test, zgq]
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /zgq
Created /zgq
[zk: 127.0.0.1:2181(CONNECTED) 7] quit
PHP下实践 zookeeper类信息
Zookeeper — Zookeeper类

  -- Zookeeper::addAuth — 指定应用程序凭据

  -- Zookeeper::connect — 创建与Zookeeper沟通的句柄

  -- Zookeeper::__construct — 创建与Zookeeper沟通的句柄

  -- Zookeeper::create — 同步创建节点

  -- Zookeeper::delete — 同步删除Zookeeper中的一个节点

  -- Zookeeper::exists — 同步检查Zookeeper节点的存在性

  -- Zookeeper::get — 同步获取与节点关联的数据。

  -- Zookeeper::getAcl — 同步地获取与节点关联的ACL。

  -- Zookeeper::getChildren — 同步列出节点的子节点

  -- Zookeeper::getClientId — 返回客户端会话ID,仅在连接当前连接时才有效(即最后观察者状态为ZooOnCeleDelphi状态)

  -- Zookeeper::getRecvTimeout — 返回此会话的超时,如果连接当前连接(只有上次观察者状态为ZooOnCeleTytStand状态)才有效。此值可能在服务器重新连接后发生更改。

  -- Zookeeper::getState — 获取Zookeeper连接的状态

  -- Zookeeper::isRecoverable — 检查当前的Zookeeper连接状态是否可以恢复

  -- Zookeeper::set — 设置与节点关联的数据

  -- Zookeeper::setAcl — 同步设置与节点关联的ACL

  -- Zookeeper::setDebugLevel — 设置库的调试级别

  -- Zookeeper::setDeterministicConnOrder — 启用/禁用仲裁端点顺序随机化

  -- Zookeeper::setLogStream — 设置库用于日志记录的流

  -- Zookeeper::setWatcher — 设置观察函数
客户端操作
zkCli.cmd

# output 表示连接成功
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]

# 以下是操作命令
ls /
create /ztest 1
获取节点信息(Zookeeper::get)
# get.php

$zoo = new Zookeeper("192.168.1.45:2181");
$r = $zoo->get( "/ztest");
var_dump($r);
获取节点信息(Zookeeper::get)并设置watcher
# get.php
class ZookeeperDemo extends Zookeeper
{

    public function watcher($type, $state, $key)
    {

        var_dump($type);
        var_dump($state);
        var_dump($key);

        if ($type == 3) {
            var_dump($this->get("/zgetw"));
            // Watcher gets consumed so we need to set a new one
            $this->get("/zgetw", array($this, "watcher"));
        }
    }

}

$zoo = new ZookeeperDemo("192.168.1.45:2181");
$zoo->get("/zgetw", [$zoo, "watcher"]);

while (true) {
    echo ".";
    sleep(1);
}
客户端操作
set /ztest 2
set /ztest 3
set /ztest 5
获取节点信息(Zookeeper::get) 结果
php get.php

#output 
......int(3)
int(3)
string(6) "/ztest"
string(1) "2"
..int(3)
int(3)
string(6) "/ztest"
string(1) "3"
..int(3)
int(3)
string(6) "/ztest"
string(1) "4"
.int(3)
int(3)
string(6) "/ztest"
string(1) "5"
创建子节点
create /ztest/child1 c1
# output: Created /ztest/child1

create /ztest/child2 c2
# output: Created /ztest/child2

ls /ztest
# output: [child2, child1]
php下获取子节点
# getChild.php
$zookeeper = new Zookeeper("192.168.1.45:2181");
$path = "/ztest";
$r = $zookeeper->getchildren($path);
if (!empty($r)) {
    foreach ($r as $c)
    {
        var_dump($c);
    }
}

# php getChild.php
string(6) "child2"
string(6) "child1"
创建顺序节点
[zk: localhost:2181(CONNECTED) 22] create /stest 1
Created /stest
[zk: localhost:2181(CONNECTED) 23] create -s /stest/seq 1
Created /stest/seq0000000000
[zk: localhost:2181(CONNECTED) 24] create -s /stest/seq 1
Created /stest/seq0000000001
[zk: localhost:2181(CONNECTED) 25] create -s /stest/seq 1
Created /stest/seq0000000002
[zk: localhost:2181(CONNECTED) 26] create -s /stest/seq 1
Created /stest/seq0000000003
[zk: localhost:2181(CONNECTED) 27] create -s /stest/seq 1
Created /stest/seq0000000004
[zk: localhost:2181(CONNECTED) 28] create -s /stest/seq 1
Created /stest/seq0000000005
[zk: localhost:2181(CONNECTED) 29] create -s /stest/seq 1
Created /stest/seq0000000006
[zk: localhost:2181(CONNECTED) 30] create -s /stest/seq 1
Created /stest/seq0000000007
[zk: localhost:2181(CONNECTED) 31] create -s /stest/seq 2
Created /stest/seq0000000008
[zk: localhost:2181(CONNECTED) 32] create -s /stest/seq 2
Created /stest/seq0000000009
[zk: localhost:2181(CONNECTED) 33] create -s /stest/seq 2
Created /stest/seq0000000010
[zk: localhost:2181(CONNECTED) 34] create -s /stest/seq 2
Created /stest/seq0000000011
[zk: localhost:2181(CONNECTED) 35] create -s /stest/seq 2
Created /stest/seq0000000012
[zk: localhost:2181(CONNECTED) 36] create -s /stest/seq 2
Created /stest/seq0000000013
[zk: localhost:2181(CONNECTED) 37] create -s /stest/seq 2
Created /stest/seq0000000014
代码
$zookeeper = new Zookeeper("192.168.1.45:2181");
$aclArray = array(
    array(
        "perms"  => Zookeeper::PERM_ALL,
        "scheme" => "world",
        "id"     => "anyone",
    )
);
$path = "/t3";
//ZOO_EPHEMERAL = 1
//ZOO_SEQUENCE  = 2
//这里这里的flag=NULL,flag=0 表示创建永久节点,=1创建临时节点,=2创建seq 顺序节点
$realPath = $zookeeper->create($path, null, $aclArray, 1);
var_dump($realPath);
worker

一个利用顺序临时节点的leader迁移实现

# worker.php
 Zookeeper::PERM_ALL,
            "scheme" => "world",
            "id" => "anyone"
        ]
    ];

    private $isLeader = false;

    private $znode = "";
    private $prevNode = "";

    public function __construct($host = "", $watcher_cb = null, $recv_timeout = 10000)
    {
        $this->zk = new Zookeeper($host, $watcher_cb, $recv_timeout);
    }

    public function register()
    {
        if (!$this->zk->exists(self::CONTAINER)) {
            $this->zk->create(self::CONTAINER, null, $this->acl);
        }

        # 创建一个临时的顺序节点
        $this->znode = $this->zk->create(self::CONTAINER . "/w-",
                                        null,
                                        $this->acl,
                                        Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE);

        //获取顺序节点名,去除父路径
        $this->znode = str_replace(self::CONTAINER . "/", "", $this->znode);

        printf("我创建了节点: %s
", self::CONTAINER . "/" . $this->znode);

        $this->prevNode = $this->getPrev();

        if (is_null($this->prevNode)) {
            $this->setLeader(true);
        } else {
            $this->zk->get(self::CONTAINER . "/" . $this->prevNode, [$this, "watcher"]);
        }
    }

    public function getPrev()
    {
        $workers = $this->zk->getChildren(self::CONTAINER);
        sort($workers);
        $size = count($workers);
        for ($i = 0; $i < $size; $i++) {
            if ($this->znode == $workers[$i] && $i > 0) {
                return $workers[$i - 1];
            }
        }

        return null;
    }

    public function watcher($type, $state, $key)
    {
        $prevNode = $this->prevNode;
        printf("watchNode-getPrevious:%s
", self::CONTAINER . "/" . $prevNode);

        if (!is_null($prevNode)) {
            if ($type == 2) {
                printf($prevNode . " had deleted
");
            }
            if ($type == 3) {
                printf("我重新监控节点:%s
", self::CONTAINER . "/" . $prevNode);
                $this->zk->get(self::CONTAINER . "/" . $prevNode, [$this, "watcher"]);
            }
        }
    }

    public function isLeader()
    {
        return $this->isLeader;
    }

    public function setLeader($flag)
    {
        $this->isLeader = $flag;
    }

    public function run()
    {
        $this->register();

        while (true) {
            if ($this->isLeader()) {
                $this->leaderJob();
            } else {
                $this->watcherJob();
            }

            sleep(2);
        }
    }

    public function leaderJob()
    {
        echo "现在我是Leader
";
    }

    public function watcherJob()
    {
        echo "我在监控:".(self::CONTAINER . "/" . $this->prevNode)."
";
    }

}

$worker = new Worker("192.168.1.45:2181");
$worker->run();
尝试
# 终端1
php worker.php
# 终端2
php worker.php
# 终端3
php worker.php

# 此处运行不会被节点变化不会被监控到
再次尝试
# zkCli
create /worker_test/w-
/worker_test/0000000020

php worker.php

# output
我创建了节点: /worker_test/w-0000000022
我在监控:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
watchNode-getPrevious:/worker_test/w-0000000020
我重新监控节点:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
watchNode-getPrevious:/worker_test/w-0000000020
我重新监控节点:/worker_test/w-0000000020
2018-08-28 02:11:46,684:2486(0x7f28ed0a1700):ZOO_WARN@zookeeper_interest@1570: Exceeded deadline by 15ms
我在监控:/worker_test/w-0000000020
我在监控:/worker_test/w-0000000020
watchNode-getPrevious:/worker_test/w-0000000020
w-0000000020 had deleted
我在监控:/worker_test/w-0000000020
这边发现一个类似bug问题
如果我们直接运行worker.php
在worker.php运行创建的临时顺序节点是不会被watcher到的
我们必须先首先创建好相关的节点再启动监控,不知道这里是不是php版本的zookeeper的bug
有了解的小伙伴可以告之下
Watcher通知状态与事件类型一览

ZOO_CREATED_EVENT(value=1):节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过zoo_exists()设置
ZOO_DELETED_EVENT(value=2):节点删除事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHANGED_EVENT(value=3):节点数据改变事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHILD_EVENT(value=4):子节点列表改变事件,此watch通过zoo_get_children()或zoo_get_children2()设置
ZOO_SESSION_EVENT(value=-1):会话事件,客户端与服务端断开或重连时触发
ZOO_NOTWATCHING_EVENT(value=-2):watch移除事件,服务端出于某些原因不再为客户端watch节点时触发

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/29341.html

相关文章

  • QConf搭建配置中心

    摘要:今天来跟大家分享的是奇虎开源的配置中心。容错当进程死掉,网络终端,机器重启等异常情况发生时,我们希望能尽可能的提供可靠的配置获取服务。配置更新及时可以秒级同步到所有客户端机器。本身是没有的恭喜你,你已经构建完自己的配置中心了。 今天来跟大家分享的是奇虎360开源的 QConf 配置中心。 为什么我们需要做这么一件事情? 因为遇到了,当业务分布较广,配置分布较广的时候,就会很容易地出现一...

    JiaXinYi 评论0 收藏0
  • PHP下kafka的实践

    摘要:消息以为类别记录将消息种子分类每一类的消息称之为一个主题。这意味着生产者不等待来自同步完成的确认继续发送下一条批消息。这意味着在已成功收到的数据并得到确认后发送下一条。三种机制,性能依次递减吞吐量降低,数据健壮性则依次递增。 kafka 简介 Kafka 是一种高吞吐量的分布式发布订阅消息系统 kafka角色必知 producer:生产者。 consumer:消费者。 topic: 消...

    Codeing_ls 评论0 收藏0
  • PHP下kafka的常用脚本实践

    摘要:阅读本教程前最好先尝试阅读下的实践自带命令实践尝试实践的知识创建话题生产消息消费消息话题信息获取消费组获取消费组的自带的命令安装目录的目录下代表我们会使用的脚本 阅读本教程前最好先尝试阅读:PHP下kafka的实践 自带命令实践 尝试实践的kafka知识: 创建话题 生产消息 消费消息 话题信息 获取消费组 获取消费组的offset 自带的命令 # kafka安装目录的bin目录下...

    caiyongji 评论0 收藏0

发表评论

0条评论

Jinkey

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<