资讯专栏INFORMATION COLUMN

JAVA通过Gearman实现MySQL到Redis的数据同步(异步复制)

doodlewind / 1849人阅读

摘要:但是这需要对文件以及有非常深入的理解,同时由于存在多种形式,分析实现同步的工作量是非常大的。因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的,将数据首先放入中,然后通过一个自己编写的,将数据同步到。此类连接远程的。

MySQL到Redis数据复制方案

无论MySQL还是Redis,自身都带有数据同步的机制,像比较常用的 MySQL的Master/Slave模式 ,就是由Slave端分析Master的binlog来实现的,这样的数据复制其实还是一个异步过程,只不过当服务器都在同一内网时,异步的延迟几乎可以忽略。

那么理论上我们也可以用同样方式,分析MySQL的binlog文件并将数据插入Redis。但是这需要对binlog文件以及MySQL有非常深入的理解,同时由于 binlog存在Statement/Row/Mixedlevel多种形式 ,分析binlog实现同步的工作量是非常大的。

因此这里选择了一种开发成本更加低廉的方式,借用已经比较成熟的MySQL UDF,将MySQL数据首先放入Gearman中,然后通过一个自己编写的PHP Gearman Worker,将数据同步到Redis。比分析binlog的方式增加了不少流程,但是实现成本更低,更容易操作。

Gearman的安装与使用

Gearman 是一个支持分布式的任务分发框架。设计简洁,获得了非常广泛的支持。一个典型的Gearman应用包括以下这些部分:

Gearman Job Server:Gearman核心程序,需要编译安装并以守护进程形式运行在后台
Gearman Client:可以理解为任务的收件员,比如我要在后台执行一个发送邮件的任务,可以在程序中调用一个Gearman Client并传入邮件的信息,然后就可以将执行结果立即展示给用户,而任务本身会慢慢在后台运行。
Gearman Worker:任务的真正执行者,一般需要自己编写具体逻辑并通过守护进程方式运行,Gearman Worker接收到Gearman Client传递的任务内容后,会按顺序处理。
以前曾经介绍过类似的 后台任务处理项目Resque 。两者的设计其实非常接近,简单可以类比为:

Gearman Job Server:对应Resque的Redis部分
Gearman Client:对应Resque的Queue操作
Gearman Worker:对应Resque的Worker和Job
这里之所以选择Gearman而不是Resque是因为Gearman提供了比较好用的MySQL UDF,工作量更小。

1、安装依赖

 yum install -y boost-devel gperf libevent-devel libuuid-devel
 yum install mysql-devel -y

2、下载gearman

 wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz

3、编译安装,指定mysqlclient的链接路径

  tar -zxvf gearmand-1.1.12.tar.gz 
  cd gearmand-1.1.12
   ./configure  
    make && make install
     

4、启动gearmand服务端 (启动之时,在/var/log/下创建gearmand.log日志文件。-l 指定日志文件 -d后台运行 -L 0.0.0.0 绑定到IPV4

gearmand -L 0.0.0.0 -l /var/log/gearmand.log -d

5、查看是否启动成功

ps -ef | grep gearman

6、查看是否安装成功,查看gearman版本信息

gearmand -V

7、MySQL UDF + Trigger同步数据到Gearman (https://github.com/mysqludf)
安装lib_mysqludf_json(lib_mysqludf_json可以把MySQL表的数据以json数据格式输出)

wget https://github.com/mysqludf/lib_mysqludf_json/archive/master.zip
unzip master.zip
cd lib_mysqludf_json-master/
rm -rf lib_mysqludf_json.so

8、编译 mysql_config 这是mysql的配置文件,可以 find /usr -name mysql_config 搜索下在什么位置

gcc $(/usr/local/mysql/bin/mysql_config  --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c

9、拷贝lib_mysqludf_json.so到MySQL的plugin目录
(可以登陆MySQL,输入命令"show variables like "%plugin%""查看plugin位置)

cp lib_mysqludf_json.so /usr/local/mysql/lib/plugin/

演示lib_mysqludf_json功能
登录mysql

mysql -uroot -h127.0.0.1 -p

注册UDF函数

CREATE FUNCTION json_object RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_array RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_members RETURNS STRING SONAME "lib_mysqludf_json.so";
CREATE FUNCTION json_values RETURNS STRING SONAME "lib_mysqludf_json.so";
//json_array|json_members|json_values函数注册方式与json_object一样.
select json_object(id,file_save_type,base_dir) as sys_file_save_config from sys_file_save_config;
ERROR 1123 (HY000): Can"t initialize function "json_object"; Invalid json member name - name cannot be empty
以上错误这样解决,给每个成员名称使用别名即可:
select json_object(id as id ,file_save_type as fileSaveType,app_id as appID) as sys_file_save_config from sys_file_save_config;

10、安装gearman-mysql-udf (https://launchpad.net/gearman...

 wget https://launchpad.net/gearman-mysql-udf/trunk/0.6/+download/gearman-mysql-udf-0.6.tar.gz
 tar zxvf gearman-mysql-udf-0.6.tar.gz 
 cd gearman-mysql-udf-0.6

11、安装libgearman-devel

   yum install libgearman-devel -y

如果没有yum源,添加epel.repo yum源

      [epel]
        name=Extra Packages for Enterprise Linux 6 - $basearch
        #baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch
        mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-6&arch=$basearch
        failovermethod=priority
        enabled=1
        gpgcheck=1
        gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
        
        [epel-debuginfo]
        name=Extra Packages for Enterprise Linux 6 - $basearch - Debug
        #baseurl=http://download.fedoraproject.org/pub/epel/6/$basearch/debug
        mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-debug-6&arch=$basearch
        failovermethod=priority
        enabled=0
        gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
        gpgcheck=1
        
        [epel-source]
        name=Extra Packages for Enterprise Linux 6 - $basearch - Source
        #baseurl=http://download.fedoraproject.org/pub/epel/6/SRPMS
        mirrorlist=https://mirrors.fedoraproject.org/metalink?repo=epel-source-6&arch=$basearch
        failovermethod=priority
        enabled=0
        gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-6
        gpgcheck=1

12、编译安装
(可以登陆MySQL,输入命令"show variables like "%plugin%""查看plugin位置, mysql_config的配置文件,以及插件库所在路径,编译之后会在此路径生成.so文件)

./configure --with-mysql=/usr/local/mysql/bin/mysql_config --libdir=/usr/local/mysql/lib/plugin/
make && make install

演示gearman-mysql-udf功能

mysql -uroot -p
CREATE FUNCTION gman_do_background RETURNS STRING SONAME "libgearman_mysql_udf.so";
CREATE FUNCTION gman_servers_set RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_high RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_low RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_high_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_do_low_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
CREATE FUNCTION gman_sum RETURNS STRING SONAME "libgearman_mysql_udf.so"; 
//函数gman_do|gman_do_high|gman_do_low|gman_do_high_background|gman_do_low_background|gman_sum注册方式类似,请参考gearman-mysql-udf-0.6/README 
//指定gearman job server地址 
SELECT gman_servers_set("127.0.0.1:4730"); 

如果出现异常信息:
ERROR 1126 (HY000): Can"t open shared library "libgearman_mysql_udf.so" (errno: 11 libgearman.so.8: cannot open shared object file: No such file or directory)
表示系统找不到 libgearman.so 文件,一般so都在/usr/local/lib目录下,修改配置文件/etc/ld.so.conf,将/usr/local/lib目录加入进去即可:

$ cat /etc/ld.so.conf
    include ld.so.conf.d/*.conf
    /usr/local/lib
$ /sbin/ldconfig -v | grep gearman*

13、MySQL Trigger调用Gearman UDF实现同步
创建触发器

DELIMITER $$
CREATE TRIGGER test_data_to_redis AFTER UPDATE ON test FOR EACH ROW BEGIN
    SET@ret=gman_do_background("syncToRedis", json_object(NEW.id AS `id`, NEW.phone AS`phone`));
END$$;

    DELIMITER $$
CREATE TRIGGER test_data_to_redis2 AFTER INSERT ON test
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background("syncToRedis2", json_object(NEW.id AS `id`, NEW.phone AS`phone`)); 
  END$$
DELIMITER ;

DELIMITER $$
CREATE TRIGGER test_data_to_redis3 BEFORE DELETE ON test
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background("syncToRedis3", json_object(OLD.id AS `id`, OLD.phone AS`phone`)); 
  END$$
DELIMITER ;

说明以及问题:此类采用了gearman官网的java-gearman-service(地址:https://launchpad.net/gearman...),目前release版本是0.6.6。java-gearman-servic.jar包中,即包括gearman server,还包括client和work客户端API。
问题:config类为spring注入的配置文件类,在worker.addFunction中,如果通过config类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是OK的。此类连接远程的gearman job server。

jar包需要添加到本地jar仓库:

mvn install:install-file -Dfile=C:softwarejava-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar

import java.util.concurrent.TimeUnit;

import org.gearman.Gearman;
import org.gearman.GearmanFunction;
import org.gearman.GearmanFunctionCallback;
import org.gearman.GearmanServer;
import org.gearman.GearmanWorker;

/**
 * *ECHO_HOST = "192.168.125.131"为安装了Gearman并开启geramand服务的主机地址
 *int ECHO_PORT = 4730默认端口为4730
 *
 * @author Administrator
 *
 */
public class EchoWorker implements GearmanFunction {

// function name
public static final String ECHO_FUNCTION_NAME = "syncToRedis";

// job server地址
public static final String ECHO_HOST = "192.168.1.245";

// job server监听的端口
public static final int ECHO_PORT = 4730;

public static void main(String[] args) {
    // 创建一个Gearman实例
    Gearman gearman = Gearman.createGearman();
    /*
     * 创建一个jobserver
     * 
     * Parameter 1: job server的IP地址 Parameter 2: job server监听的端口
     * 
     * job server收到client的job,并将其分发给注册worker
     * 
     */
    GearmanServer server = gearman.createGearmanServer(EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);
    // 创建一个Gearman的worker
    GearmanWorker worker = gearman.createGearmanWorker(); // 正题来了,创建work节点。
    worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间
    worker.setMaximumConcurrency(5); // 最大并发数
    // 告诉工人如何执行工作(主要实现了GearmanFunction接口)
    worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());
    // worker连接服务器
    worker.addServer(server);
}

@Override
public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {
    // work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写
    if (data != null) {
        String str = new String(data);
        System.out.println(str);
        StringBuffer sb = new StringBuffer(str);
        return sb.reverse().toString().getBytes();
    } else {
        return "未接收到data".getBytes();
    }
}
}


import org.gearman.Gearman;  
import org.gearman.GearmanClient;  
import org.gearman.GearmanJobEvent;  
import org.gearman.GearmanJobReturn;  
import org.gearman.GearmanServer;  
  
public class EchoClient {  
    public static void main(String... args) throws InterruptedException {  
        //创建一个Gearman实例  
        Gearman gearman = Gearman.createGearman();  
        //创建一个Gearman client               
        GearmanClient client = gearman.createGearmanClient();  
        /*  
         * 创建一个jobserver  
         *   
         * Parameter 1: job server的IP地址  
         * Parameter 2: job server监听的端口  
         *   
         *job server收到client的job,并将其分发给注册worker  
         *  
         */  
        GearmanServer server = gearman.createGearmanServer(  
                        EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);  
         // 告诉客户端,提交工作时它可以连接到该服务器  
        client.addServer(server);  
        /*  
         * 向job server提交工作  
         *   
         * Parameter 1: gearman function名字  
         * Parameter 2: 传送给job server和worker的数据  
         *   
         * GearmanJobReturn返回job发热结果  
         */  
        GearmanJobReturn jobReturn = client.submitJob(  
                        EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());  
        //遍历作业事件,直到我们打到最后文件               
        while (!jobReturn.isEOF()) {  

                //下一个作业事件  
                GearmanJobEvent event = jobReturn.poll();  

                switch (event.getEventType()) {  

                case GEARMAN_JOB_SUCCESS:     //job执行成功  
                        System.out.println(new String(event.getData()));  
                        break;  
                case GEARMAN_SUBMIT_FAIL:     //job提交失败  
                      
                case GEARMAN_JOB_FAIL:        //job执行失败  
                        System.err.println(event.getEventType() + ": "  
                                        + new String(event.getData()));  
                default:  
                }  
        }  
        //关闭  
        gearman.shutdown();  
}  

}
官网:http://gearman.org/download/
php方案:https://www.tuicool.com/artic...

跟多精彩文章

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70771.html

相关文章

  • Gearman安装和使用

    摘要:启动和如下信息则表示成功查看版本安装扩展从下载最新扩展需下载最新源码包,并解压缩安装安装成功后信息然后,配置文件增加内容重启后,出现如下信息则表示安装扩展成功。 首发于 樊浩柏科学院 Gearman 是一个分布式任务分发系统,通过程序调用(API,跨语言)分布式地把工作委派给更适合做某项工作的机器,且这些机器可以以并发的、负载均衡的形式来共同完成某项工作。当计算密集型场景时,适合在后...

    U2FsdGVkX1x 评论0 收藏0
  • 服务器迁移踩坑经验分享

    摘要:去年年底因为使用了云存储和其他方面的原因,计划的将服务器缩减一个机柜出来。云服务的回源服务器的配置中间漏了一台,后期给补上了。监控迁移完毕之后,除了常规的业务代码,还需要注意图片资源的回源是否正常服务器压力是否正常检查日志是否出现错误。 去年年底因为使用了云存储和其他方面的原因,计划的将服务器缩减一个机柜出来。这样今年每月机房的费用可以减少1万左右。前前后后抽空在弄这个任务,现做个笔记...

    Developer 评论0 收藏0
  • 通用图床服务架构解析(百万级回源/天)

    摘要:转发通过,将请求通过负载均衡,均衡给后端处理的服务。图床同时也支持同步上传回调通知的方式,将图片上传结果反馈给业务方。 Hulk 图床是支持 360 公司绝大部分业务的图片服务,支持多种图片处理功能,如:裁剪、压缩、滤镜、pHash 计算、人脸识别、格式转换、gif 首帧提取……等等,支持的业务线包括:搜索、图搜、新闻、信息流、广告……等等,每天 CDN 回源图床后端 150+ 亿 P...

    XFLY 评论0 收藏0
  • 通用图床服务架构解析(百万级回源/天)

    摘要:转发通过,将请求通过负载均衡,均衡给后端处理的服务。图床同时也支持同步上传回调通知的方式,将图片上传结果反馈给业务方。 Hulk 图床是支持 360 公司绝大部分业务的图片服务,支持多种图片处理功能,如:裁剪、压缩、滤镜、pHash 计算、人脸识别、格式转换、gif 首帧提取……等等,支持的业务线包括:搜索、图搜、新闻、信息流、广告……等等,每天 CDN 回源图床后端 150+ 亿 P...

    Karrdy 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<