摘要:概述这是关于入门学习的第八篇文章连接池的实现。开始今天的文章,这篇文章实现了连接池,代码是在的实现文章的基础上进行开发的。
概述
这是关于 Swoole 入门学习的第八篇文章:Swoole MySQL 连接池的实现。
第七篇:Swoole RPC 的实现
第六篇:Swoole 整合成一个小框架
第五篇:Swoole 多协议 多端口 的应用
第四篇:Swoole HTTP 的应用
第三篇:Swoole WebSocket 的应用
第二篇:Swoole Task 的应用
第一篇:Swoole Timer 的应用
收到读者的咨询,这情况大家可能也会有,所以就在这说说:
“亮哥,我今年30岁了,有点中年危机,最近有点焦虑,发现工作虽然很忙,但是没感觉能力有所提升,整天都有写不完的业务代码,时间紧有时代码质量也不怎么样,知道还有很多改进空间,但一直没时间改,主要是后面项目压着,马上又要进入开发了,这种情况怎么办?”
首先,我是菜鸡,观点不喜勿喷,那我就说下自己的看法:
上面的描述比较主观,人呀有时候发现不了自己的能力很正常,有时候有能力了并不是马上就能显现的,而是到了某个阶段后突然发现,哇塞,原来自己这么厉害。
当然能力也分很多种,比如专业能力,快速学习能力,进度把控能力,还有自信也是一种能力,不要脸是一种能力,坚持不要脸更是一种能力。
其实能力提升最快的还是靠工作实践,悄悄问问自己加入了很多大牛的微信群,能力提升了吗?看书自学不实践是不是吸收的也不多。
如果非要给一个具体的方案,那就是在团队内多分享吧,因为在分享前你会做充分的准备来避免分享时出丑,即使有时候自己知道,当讲出来的时候就不是那么回事了。
前期分享可以是看稿,后期练习无稿分享。
然后,再多说一点,30了给自己一个目标,不要盲目每天就是学学学,比如目标是技术专家,目标是业务专家,都很好呀,当然目标与自己性格有关也不是一成不变的。
围绕着目标设置一些计划,不要以为每天的学学学,就觉得其他的一切就自然而来,其中还有很多机遇和人脉的因素。
最后,如果实在感觉压得喘不过气,就换个环境吧,别和自己过不去。
开始今天的文章,这篇文章实现了 Swoole MySQL 连接池,代码是在《Swoole RPC 的实现》文章的基础上进行开发的。
先回顾上篇文章的内容:
实现了 HTTP / TCP 请求
实现了 同步 / 异步 请求
分享了 OnRequest.php、OnReceive.php 源码
业务逻辑 Order.php 中返回的是假数据
本篇文章主要的功能点:
业务逻辑 Order.php 中返回 MySQL 数据库中的数据。
Task 启用了协程
支持 主/从 数据库配置
实现数据库连接池
实现数据库 CURD
代码 Order.phpmysql = $pool->get(); $this->table = "order"; } public function add($code = "", $name = "") { //TODO 验证 return $this->mysql->insert($this->table, ["code" => $code, "name" => $name]); } public function edit($id = 0, $name="") { //TODO 验证 return $this->mysql->update($this->table, ["name" => $name], ["id" => $id]); } public function del($id = 0) { //TODO 验证 return $this->mysql->delete($this->table, ["id" => $id]); } public function info($code = "") { //TODO 验证 return $this->mysql->select($this->table, ["code" => $code]); } }Task 启用协程
一、需要新增两项配置:
enable_coroutine = true task_enable_coroutine = true
二、回调参数发生改变
$serv->on("Task", function ($serv, $task_id, $src_worker_id, $data) { ... }); 修改成: $serv->on("Task", function ($serv, $task) { $task->worker_id; //来自哪个`Worker`进程 $task->id; //任务的编号 $task->data; //任务的数据 });数据库 主/从 配置
Mysql.php
数据库连接池MysqlPool.php
pool)) { $this->config = $config; $this->pool = new chan($config["master"]["pool_size"]); for ($i = 0; $i < $config["master"]["pool_size"]; $i++) { go(function() use ($config) { $mysql = new MysqlDB(); $res = $mysql->connect($config); if ($res === false) { throw new RuntimeException("Failed to connect mysql server"); } else { $this->pool->push($mysql); } }); } } } public function get() { if ($this->pool->length() > 0) { $mysql = $this->pool->pop($this->config["master"]["pool_get_timeout"]); if (false === $mysql) { throw new RuntimeException("Pop mysql timeout"); } defer(function () use ($mysql) { //释放 $this->pool->push($mysql); }); return $mysql; } else { throw new RuntimeException("Pool length <= 0"); } } }数据库 CURDMysqlDB.php
_execute($arguments[0]); } } public function connect($config) { //主库 $master = new SwooleCoroutineMySQL(); $res = $master->connect($config["master"]); if ($res === false) { throw new RuntimeException($master->connect_error, $master->errno); } else { $this->master = $master; } //从库 $slave = new SwooleCoroutineMySQL(); $res = $slave->connect($config["slave"]); if ($res === false) { throw new RuntimeException($slave->connect_error, $slave->errno); } else { $this->slave = $slave; } $this->config = $config; return $res; } public function insert($table = "", $data = []) { $fields = ""; $values = ""; $keys = array_keys($data); foreach ($keys as $k) { $fields .= "`".addslashes($k)."`, "; $values .= """.addslashes($data[$k])."", "; } $fields = substr($fields, 0, -2); $values = substr($values, 0, -2); $sql = "INSERT INTO `{$table}` ({$fields}) VALUES ({$values})"; return $this->_execute($sql); } public function update($table = "", $set = [], $where = []) { $arr_set = []; foreach ($set as $k => $v) { $arr_set[] = "`".$k . "` = " . $this->_escape($v); } $set = implode(", ", $arr_set); $where = $this->_where($where); $sql = "UPDATE `{$table}` SET {$set} {$where}"; return $this->_execute($sql); } public function delete($table = "", $where = []) { $where = $this->_where($where); $sql = "DELETE FROM `{$table}` {$where}"; return $this->_execute($sql); } public function select($table = "",$where = []) { $where = $this->_where($where); $sql = "SELECT * FROM `{$table}` {$where}"; return $this->_execute($sql); } private function _where($where = []) { $str_where = ""; foreach ($where as $k => $v) { $str_where .= " AND `{$k}` = ".$this->_escape($v); } return "WHERE 1 ".$str_where; } private function _escape($str) { if (is_string($str)) { $str = """.$str."""; } elseif (is_bool($str)) { $str = ($str === FALSE) ? 0 : 1; } elseif (is_null($str)) { $str = "NULL"; } return $str; } private function _execute($sql) { if (strtolower(substr($sql, 0, 6)) == "select") { $db = $this->_get_usable_db("slave"); } else { $db = $this->_get_usable_db("master"); } $result = $db->query($sql); if ($result === true) { return [ "affected_rows" => $db->affected_rows, "insert_id" => $db->insert_id, ]; } return $result; } private function _get_usable_db($type) { if ($type == "master") { if (!$this->master->connected) { $master = new SwooleCoroutineMySQL(); $res = $master->connect($this->config["master"]); if ($res === false) { throw new RuntimeException($master->connect_error, $master->errno); } else { $this->master = $master; } } return $this->master; } elseif ($type == "slave") { if (!$this->slave->connected) { $slave = new SwooleCoroutineMySQL(); $res = $slave->connect($this->config["slave"]); if ($res === false) { throw new RuntimeException($slave->connect_error, $slave->errno); } else { $this->slave = $slave; } } return $this->slave; } } }OnWorkerStart 中调用try { MysqlPool::getInstance(get_config("mysql")); } catch (Exception $e) { $serv->shutdown(); } catch (Throwable $throwable) { $serv->shutdown(); }客户端发送请求"SW", "token" => "Bb1R3YLipbkTp5p0", "param" => [ "class" => "Order", "method" => "add", "param" => [ "code" => "C".mt_rand(1000,9999), "name" => "订单-".mt_rand(1000,9999), ], ], ]; //编辑 $demo = [ "type" => "SW", "token" => "Bb1R3YLipbkTp5p0", "param" => [ "class" => "Order", "method" => "edit", "param" => [ "id" => "4", "name" => "订单-".mt_rand(1000,9999), ], ], ]; //删除 $demo = [ "type" => "SW", "token" => "Bb1R3YLipbkTp5p0", "param" => [ "class" => "Order", "method" => "del", "param" => [ "id" => "1", ], ], ]; //查询 $demo = [ "type" => "SW", "token" => "Bb1R3YLipbkTp5p0", "param" => [ "class" => "Order", "method" => "info", "param" => [ "code" => "C4649" ], ], ]; $ch = curl_init(); $options = [ CURLOPT_URL => "http://10.211.55.4:9509/", CURLOPT_POST => 1, CURLOPT_POSTFIELDS => json_encode($demo), ]; curl_setopt_array($ch, $options); curl_exec($ch); curl_close($ch);扩展官方协程 MySQL 客户端手册:
https://wiki.swoole.com/wiki/...
大家可以尝试使用官方提供的其他方法。
小结Demo 代码仅供参考,里面有很多不严谨的地方。
根据自己的需要进行修改,比如业务代码相关验证,CURD 方法封装 ...
推荐一个完善的产品,Swoole 开发的 MySQL 数据库连接池(SMProxy):
https://github.com/louislivi/...
上面的 Demo 需要源码的,加我微信。(菜单-> 加我微信-> 扫我)
推荐阅读系统的讲解 - SSO 单点登录
系统的讲解 - PHP WEB 安全防御
系统的讲解 - PHP 缓存技术
系统的讲解 - PHP 接口签名验证
系统的讲解 - PHP 浮点数高精度运算
本文欢迎转发,转发请注明作者和出处,谢谢!
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/31577.html
摘要:基于扩展实现真正的数据库连接池这种方案中,项目占用的连接数仅仅为。一种是连接暂时不再使用,其占用状态解除,可以从使用者手中交回到空闲队列中这种我们称为连接的归队。源码剖析系列目录 作者:bromine链接:https://www.jianshu.com/p/1a7...來源:简书著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。Swoft Github: https:...
摘要:一个基于协议,开发的数据库连接池。也可以通过其自身的管理机制来监视数据库连接的数量使用情况等。超出最大连接数会采用协程挂起,等到有连接关闭再恢复协程继续操作。 SMProxy GITHUB:https://github.com/louislivi/... Swoole MySQL Proxy 一个基于 MySQL 协议,Swoole 开发的MySQL数据库连接池。 原理 将数据库连接作...
阅读 3263·2022-01-04 14:20
阅读 3068·2021-09-22 15:08
阅读 2140·2021-09-03 10:44
阅读 2287·2019-08-30 15:44
阅读 1459·2019-08-29 18:40
阅读 2627·2019-08-29 17:09
阅读 2959·2019-08-26 13:53
阅读 3196·2019-08-26 13:37