资讯专栏INFORMATION COLUMN

PHP+RabbitMQ实现消息队列(代码全篇)

weakish / 3241人阅读

摘要:前言先安装对应的这里用的是不同的扩展实现方式会有细微的差异扩展地址具体以官网为准介绍配置信息基类生产者类消费者类消费者可有多个配置交换机路由生产者路由只控制发送成功不接受消费者是否收到频道

前言

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.
php扩展地址: http://pecl.php.net/package/amqp
具体以官网为准  http://www.rabbitmq.com/getstarted.html 

介绍

config.php 配置信息
BaseMQ.php MQ基类
ProductMQ.php 生产者类
ConsumerMQ.php 消费者类
Consumer2MQ.php 消费者2(可有多个)

config.php

     [
            "host" => "127.0.0.1",
            "port" => "5672",
            "login" => "guest",
            "password" => "guest",
            "vhost"=>"/",
        ],
        //交换机
        "exchange"=>"word",
        //路由
        "routes" => [],
    ];

BaseMQ.php

    conf     = $conf["host"] ;
            $this->exchange = $conf["exchange"] ;
            $this->AMQPConnection = new AMQPConnection($this->conf);
            if (!$this->AMQPConnection->connect())
                throw new AMQPConnectionException("Cannot connect to the broker!
");
        }
    
        /**
         * close link
         */
        public function close()
        {
            $this->AMQPConnection->disconnect();
        }
    
        /** Channel
         * @return AMQPChannel
         * @throws AMQPConnectionException
         */
        public function channel()
        {
            if(!$this->AMQPChannel) {
                $this->AMQPChannel =  new AMQPChannel($this->AMQPConnection);
            }
            return $this->AMQPChannel;
        }
    
        /** Exchange
         * @return AMQPExchange
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function exchange()
        {
            if(!$this->AMQPExchange) {
                $this->AMQPExchange = new AMQPExchange($this->channel());
                $this->AMQPExchange->setName($this->exchange);
            }
            return $this->AMQPExchange ;
        }
    
        /** queue
         * @return AMQPQueue
         * @throws AMQPConnectionException
         * @throws AMQPQueueException
         */
        public function queue()
        {
            if(!$this->AMQPQueue) {
                $this->AMQPQueue = new AMQPQueue($this->channel());
            }
            return $this->AMQPQueue ;
        }
    
        /** Envelope
         * @return AMQPEnvelope
         */
        public function envelope()
        {
            if(!$this->AMQPEnvelope) {
                $this->AMQPEnvelope = new AMQPEnvelope();
            }
            return $this->AMQPEnvelope;
        }
    }

ProductMQ.php

    channel();
            //创建交换机对象
            $ex = $this->exchange();
            //消息内容
            $message = "product message ".rand(1,99999);
            //开始事务
            $channel->startTransaction();
            $sendEd = true ;
            foreach ($this->routes as $route) {
                $sendEd = $ex->publish($message, $route) ;
                echo "Send Message:".$sendEd."
";
            }
            if(!$sendEd) {
                $channel->rollbackTransaction();
            }
            $channel->commitTransaction(); //提交事务
            $this->close();
            die ;
        }
    }
    try{
        (new ProductMQ())->run();
    }catch (Exception $exception){
        var_dump($exception->getMessage()) ;
    }

ConsumerMQ.php

    exchange();
            $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
            $ex->setFlags(AMQP_DURABLE); //持久化
            //echo "Exchange Status:".$ex->declare()."
";
    
            //创建队列
            $q = $this->queue();
            //var_dump($q->declare());exit();
            $q->setName($this->q_name);
            $q->setFlags(AMQP_DURABLE); //持久化
            //echo "Message Total:".$q->declareQueue()."
";
    
            //绑定交换机与队列,并指定路由键
            echo "Queue Bind: ".$q->bind($this->exchange, $this->route)."
";
    
            //阻塞模式接收消息
            echo "Message:
";
            while(True){
                $q->consume(function ($envelope,$queue){
                    $msg = $envelope->getBody();
                    echo $msg."
"; //处理消息
                    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
                });
                //$q->consume("processMessage", AMQP_AUTOACK); //自动ACK应答
            }
            $this->close();
        }
    }
    try{
        (new ConsumerMQ)->run();
    }catch (Exception $exception){
        var_dump($exception->getMessage()) ;
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/31023.html

相关文章

  • RabbitMQ使用

    摘要:的定义是使用语言开发的开源消息队列系统,完整的实现了高级抽象层消息通信协议。交换机接受发送的消息,并根据绑定规则转发到对应的队列。默认是无名交换使用空字符串标识。消息队列是内部对象,用于存储未被消费的消息。 RabbitMQ的定义 RabbitMQ是使用erlang语言开发的开源消息队列系统,完整的实现了AMPQ(高级抽象层消息通信协议)。 Mac下RabbitMQ安装 使用Hom...

    codeKK 评论0 收藏0
  • RabbitMQ发布订阅实战-实现延时重试队列

    摘要:本文将会讲解如何使用实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。 RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门...

    Heier 评论0 收藏0
  • RabbitMQ发布订阅实战-实现延时重试队列

    摘要:本文将会讲解如何使用实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。 RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门...

    vslam 评论0 收藏0
  • 转: RabbitMQPHP(一)

    摘要:需要特别明确的概念交换机的持久化,并不等于消息的持久化。消息的处理,是有两种方式,一次性。在上述示例中,使用的,意味着接收全部的消息。注意与是两个不同的队列。后端处理,可以针对每一个启动一个或多个,以提高消息处理的实时性。 RabbitMQ与PHP(一) 项目中使用RabbitMQ作为队列处理用户消息通知,消息由前端PHP代码产生,处理消息使用Python,这就导致代码一致性问题,调...

    wpw 评论0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中间的框是一个队列的消息缓冲区,保持代表的消费。本教程介绍,这是一个开放的通用的协议消息。我们将在本教程中使用,解决依赖管理。发送者将连接到,发送一条消息,然后退出。注意,这与发送发布的队列匹配。 介绍 RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就...

    silencezwm 评论0 收藏0

发表评论

0条评论

weakish

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<