php实现mqtt发布/发送 消息到主题
此次使用的是网上开源mqtt案例:其中使用的是 stream_socket_xxxx 系列函数
/* phpMQTT */ class Mqtt { private $socket; /* holds the socket */ private $msgid = 1; /* counter for message id */ public $keepalive = 10; /* default keepalive timmer */ public $timesinceping; /* host unix time, used to detect disconects */ public $topics = array(); /* used to store currently subscribed topics */ public $debug = false; /* should output debug messages */ public $address; /* broker address */ public $port; /* broker port */ public $clientid; /* client id sent to brocker */ public $will; /* stores the will of the client */ private $username; /* stores username */ private $password; /* stores password */ public $cafile; function __construct($address, $port, $clientid, $cafile = NULL){ $this->broker($address, $port, $clientid, $cafile); } /* sets the broker details */ function broker($address, $port, $clientid, $cafile = NULL){ $this->address = $address; $this->port = $port; $this->clientid = $clientid; $this->cafile = $cafile; } function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){ while($this->connect($clean, $will, $username, $password)==false){ sleep(10); } return true; } /* connects to the broker inputs: $clean: should the client send a clean session flag */ function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){ if($will) $this->will = $will; if($username) $this->username = $username; if($password) $this->password = $password; if ($this->cafile) { $socketContext = stream_context_create(["ssl" => [ "verify_peer_name" => true, "cafile" => $this->cafile ]]); $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext); } else { $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT); } if (!$this->socket ) { if($this->debug) error_log("stream_socket_create() $errno, $errstr "); return false; } stream_set_timeout($this->socket, 5); stream_set_blocking($this->socket, 0); $i = 0; $buffer = ""; $buffer .= chr(0x00); $i++; $buffer .= chr(0x06); $i++; $buffer .= chr(0x4d); $i++; $buffer .= chr(0x51); $i++; $buffer .= chr(0x49); $i++; $buffer .= chr(0x73); $i++; $buffer .= chr(0x64); $i++; $buffer .= chr(0x70); $i++; $buffer .= chr(0x03); $i++; //No Will $var = 0; if($clean) $var+=2; //Add will info to header if($this->will != NULL){ $var += 4; // Set will flag $var += ($this->will["qos"] << 3); //Set will qos if($this->will["retain"]) $var += 32; //Set will retain } if($this->username != NULL) $var += 128; //Add username to header if($this->password != NULL) $var += 64; //Add password to header $buffer .= chr($var); $i++; //Keep alive $buffer .= chr($this->keepalive >> 8); $i++; $buffer .= chr($this->keepalive & 0xff); $i++; $buffer .= $this->strwritestring($this->clientid,$i); //Adding will to payload if($this->will != NULL){ $buffer .= $this->strwritestring($this->will["topic"],$i); $buffer .= $this->strwritestring($this->will["content"],$i); } if($this->username) $buffer .= $this->strwritestring($this->username,$i); if($this->password) $buffer .= $this->strwritestring($this->password,$i); $head = " "; $head{0} = chr(0x10); $head{1} = chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer); $string = $this->read(4); if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){ if($this->debug) echo "Connected to Broker "; }else{ error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x) ", ord($string{0}),ord($string{3}))); return false; } $this->timesinceping = time(); return true; } /* read: reads in so many bytes */ function read($int = 8192, $nb = false){ // print_r(socket_get_status($this->socket)); $string=""; $togo = $int; if($nb){ return fread($this->socket, $togo); } while (!feof($this->socket) && $togo>0) { $fread = fread($this->socket, $togo); $string .= $fread; $togo = $int - strlen($string); } return $string; } /* subscribe: subscribes to topics */ function subscribe($topics, $qos = 0){ $i = 0; $buffer = ""; $id = $this->msgid; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; foreach($topics as $key => $topic){ $buffer .= $this->strwritestring($key,$i); $buffer .= chr($topic["qos"]); $i++; $this->topics[$key] = $topic; } $cmd = 0x80; //$qos $cmd += ($qos << 1); $head = chr($cmd); $head .= chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer, $i); $string = $this->read(2); $bytes = ord(substr($string,1,1)); $string = $this->read($bytes); } /* ping: sends a keep alive ping */ function ping(){ $head = " "; $head = chr(0xc0); $head .= chr(0x00); fwrite($this->socket, $head, 2); if($this->debug) echo "ping sent "; } /* disconnect: sends a proper disconect cmd */ function disconnect(){ $head = " "; $head{0} = chr(0xe0); $head{1} = chr(0x00); fwrite($this->socket, $head, 2); } /* close: sends a proper disconect, then closes the socket */ function close(){ $this->disconnect(); stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } /* publish: publishes $content on a $topic */ function publish($topic, $content, $qos = 0, $retain = 0){ $i = 0; $buffer = ""; $buffer .= $this->strwritestring($topic,$i); //$buffer .= $this->strwritestring($content,$i); if($qos){ $id = $this->msgid++; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; } $buffer .= $content; $i+=strlen($content); $head = " "; $cmd = 0x30; if($qos) $cmd += $qos << 1; if($retain) $cmd += 1; $head{0} = chr($cmd); $head .= $this->setmsglength($i); fwrite($this->socket, $head, strlen($head)); fwrite($this->socket, $buffer, $i); } /* message: processes a recieved topic */ function message($msg){ $tlen = (ord($msg{0})<<8) + ord($msg{1}); $topic = substr($msg,2,$tlen); $msg = substr($msg,($tlen+2)); $found = 0; foreach($this->topics as $key=>$top){ if( preg_match("/^".str_replace("#",".*", str_replace("+","[^/]*", str_replace("/","/", str_replace("$","$", $key))))."$/",$topic) ){ if(is_callable($top["function"])){ call_user_func($top["function"],$topic,$msg); $found = 1; } } } if($this->debug && !$found) echo "msg recieved but no match in subscriptions "; } /* proc: the processing loop for an "allways on" client set true when you are doing other stuff in the loop good for watching something else at the same time */ function proc( $loop = true){ if(1){ $sockets = array($this->socket); $w = $e = NULL; $cmd = 0; //$byte = fgetc($this->socket); if(feof($this->socket)){ if($this->debug) echo "eof receive going to reconnect for good measure "; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } $byte = $this->read(1, true); if(!strlen($byte)){ if($loop){ usleep(100000); } }else{ $cmd = (int)(ord($byte)/16); if($this->debug) echo "Recevid: $cmd "; $multiplier = 1; $value = 0; do{ $digit = ord($this->read(1)); $value += ($digit & 127) * $multiplier; $multiplier *= 128; }while (($digit & 128) != 0); if($this->debug) echo "Fetching: $value "; if($value) $string = $this->read($value); if($cmd){ switch($cmd){ case 3: $this->message($string); break; } $this->timesinceping = time(); } } if($this->timesinceping < (time() - $this->keepalive )){ if($this->debug) echo "not found something so ping "; $this->ping(); } if($this->timesinceping<(time()-($this->keepalive*2))){ if($this->debug) echo "not seen a package in a while, disconnecting "; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } } return 1; } /* getmsglength: */ function getmsglength(&$msg, &$i){ $multiplier = 1; $value = 0 ; do{ $digit = ord($msg{$i}); $value += ($digit & 127) * $multiplier; $multiplier *= 128; $i++; }while (($digit & 128) != 0); return $value; } /* setmsglength: */ function setmsglength($len){ $string = ""; do{ $digit = $len % 128; $len = $len >> 7; // if there are more digits to encode, set the top bit of this digit if ( $len > 0 ) $digit = ($digit | 0x80); $string .= chr($digit); }while ( $len > 0 ); return $string; } /* strwritestring: writes a string to a buffer */ function strwritestring($str, &$i){ $ret = " "; $len = strlen($str); $msb = $len >> 8; $lsb = $len % 256; $ret = chr($msb); $ret .= chr($lsb); $ret .= $str; $i += ($len+2); return $ret; } function printstr($string){ $strlen = strlen($string); for($j=0;$j<$strlen;$j++){ $num = ord($string{$j}); if($num > 31) $chr = $string{$j}; else $chr = " "; printf("%4d: %08b : 0x%02x : %s ",$j,$num,$num,$chr); } } }实现部分 发送到主题
// 发送给订阅号信息,创建socket,无sam队列 $server = ""; // 服务代理地址(mqtt服务端地址) $port = 1883; // 通信端口 $username = ""; // 用户名(如果需要) $password = ""; // 密码(如果需要 $client_id = "clientx9293670xxctr"; // 设置你的连接客户端id $mqtt = new Mqtt($server, $port, $client_id); //实例化MQTT类 if ($mqtt->connect(true, NULL, $username, $password)) { //如果创建链接成功 $mqtt->publish("xxx3809293670ctr", "setr=3xxxxxxxxx", 0); // 发送到 xxx3809293670ctr 的主题 一个信息 内容为 setr=3xxxxxxxxx Qos 为 0 $mqtt->close(); //发送后关闭链接 } else { echo "Time out! "; }订阅主题
/*// 订阅信息,接收一个信息后退出 $server = ""; // 服务代理地址(mqtt服务端地址) $port = 1883; // 通信端口 $username = ""; // 用户名(如果需要) $password = ""; // 密码(如果需要 $client_id = "clientx9293670xxctr"; // 设置你的连接客户端id $mqtt = new Mqtt($server, $port, $client_id); if(!$mqtt->connect(true, NULL, $username, $password)) { //链接不成功再重复执行监听连接 exit(1); } $topics["SN69143809293670state"] = array("qos" => 0, "function" => "procmsg"); // 订阅主题为 SN69143809293670state qos为0 $mqtt->subscribe($topics, 0); while($mqtt->proc()){ } //死循环监听 $mqtt->close(); function procmsg($topic, $msg){ //信息回调函数 打印信息 echo "Msg Recieved: " . date("r") . " "; echo "Topic: {$topic} "; echo " $msg "; $xxx = json_decode($msg); var_dump($xxxxxx->aa); die; }
我最终写出的mqtt api 使用的是node;为什么?
摘要:,消息队列遥测传输是开发的一个即时通讯协议,有可能成为物联网的重要组成部分。会发生消息丢失或重复。只有一次,确保消息到达一次。此外,国内很多企业都广泛使用作为手机客户端与服务器端推送消息的协议。 前几天写了一下MQTT协议实现推送数据传输,所以我会不定期的更新一下关注MQTT的知识。 MQTT: MQTT(Message Queuing Telemetry Transport,消息队列...
摘要:协议版本介绍互联网的基础网络协议是协议消息队列遥测传输是基于协议栈而构建的已成为通信的标准为什么选择有多好多好多么牛逼我就不说了说的再多不如一个一个试试完了做比对剩下的那个就是要选择的实在不想这样搞技术就跟着一线走发布和订阅模型协议在网络中 mqtt 协议版本: 3.1.1 MQTT 介绍 互联网的基础网络协议是 TCP/IP协议. MQTT(消息队列遥测传输)是基于 TCP/IP 协...
摘要:时间就是金钱,效率就是生命本教程助力开发者使用协议快速产品化。摘要借助具备及联网功能的,快速部署到客户产品上,助力开发,缩短开发周期,快速实现产品商业化。 时间就是金钱,效率就是生命 本教程助力开发者使用MQTT协议快速产品化。 摘要 借助具备MQTT及联网功能的DTU,快速部署到客户产品...
阅读 1585·2021-11-16 11:45
阅读 2502·2021-09-29 09:48
阅读 3172·2021-09-07 10:26
阅读 1827·2021-08-16 10:50
阅读 1847·2019-08-30 15:44
阅读 2678·2019-08-28 18:03
阅读 1882·2019-08-27 10:54
阅读 1798·2019-08-26 14:01