资讯专栏INFORMATION COLUMN

微信开源mars源码分析1—上层samples分析

caiyongji / 1269人阅读

摘要:微信已经开源了,但是市面上相关的文章较少,即使有也是多在于使用等这些,那么这次我希望能够从这个直接用于底层通讯的部分进行个分析。首先明确下,微信用了的开源协议库,来代替和。核心的部分我们先放下,下一篇再深入分析。

微信已经开源了mars,但是市面上相关的文章较少,即使有也是多在于使用xlog等这些,那么这次我希望能够从stn这个直接用于im底层通讯的部分进行个分析。
为了能分析的全面些,我们从samples开始。
首先明确下,微信用了google的开源协议protobuf库,来代替json和xml。至于为何使用这个,原因还在于效率和传输量上,效率上他能够比json提升将近10倍,而且基于二进制而非文本,传输的大小更加有优势,具体的不再累述,有兴趣的可以自己查查。
我们从samples开始看看通过http是怎么获得列表数据的,直接看/mars-master/samples/android/marsSampleChat/app/src/main/java/com/tencent/mars/sample/ConversationActivity.java,这个是个初始的列表界面,需要看的就是这个:

/**
     * pull conversation list from server
     */
    private void updateConversationTopics() {
        if (taskGetConvList != null) {
            MarsServiceProxy.cancel(taskGetConvList);
        }

        mTextView.setVisibility(View.INVISIBLE);
        progressBar.setVisibility(View.VISIBLE);

        swipeRefreshLayout.setRefreshing(true);

        taskGetConvList = new NanoMarsTaskWrapper(
                new Main.ConversationListRequest(),
                new Main.ConversationListResponse()
        ) {

            private List dataList = new LinkedList<>();

            @Override
            public void onPreEncode(Main.ConversationListRequest req) {
                req.type = conversationFilterType;
                req.accessToken = ""; // TODO:

                Log.d("xxx", "onPreEncode: " + req.toString());
            }

            @Override
            public void onPostDecode(Main.ConversationListResponse response) {
                Log.d("xxx", "onPostDecode: " + response.toString());
            }

            @Override
            public void onTaskEnd(int errType, int errCode) {
                Log.d("xxx", "onTaskEnd: " + errType + " " + errCode);

                runOnUiThread(new Runnable() {

                    @Override
                    public void run() {
                        if (response != null) {
                            for (Main.Conversation conv : response.list) {
                                dataList.add(new Conversation(conv.name, conv.topic, conv.notice));
                                Log.d("xxx", conv.toString());
                            }
                        }

                        if (!dataList.isEmpty()) {
                            progressBar.setVisibility(View.INVISIBLE);
                            conversationListAdapter.list.clear();
                            conversationListAdapter.list.addAll(dataList);
                            conversationListAdapter.notifyDataSetChanged();

                            swipeRefreshLayout.setRefreshing(false);

                        }
                        else {
                            Log.i(TAG, "getconvlist: empty response list");
                            progressBar.setVisibility(View.INVISIBLE);
                            mTextView.setVisibility(View.VISIBLE);
                        }
                    }
                });
            }

        };

        MarsServiceProxy.send(taskGetConvList.setHttpRequest(CONVERSATION_HOST, "/mars/getconvlist"));
    }

new了一个NanoMarsTaskWrapper对象,并Override了几个方法:onPreEncode、onPostDecode、onTaskEnd。分别是编码传输前回调,收到结果解码后回调,任务结束后回调;

设置NanoMarsTaskWrapper对象的http url地址;

通过MarsServiceProxy的send方法,执行发送;

通过这些,我们可以大体了解到,通过一个内置的任务体系,来进行传输的派发调用的;通过服务来驱使整个体系运转,并保证独立性;

其实在目录中已经可以看到了,samples分为2个部分,一个是app,另一个是wrapper,wrapper是jar。
好吧,我们从wrapper入手看下基本结构。
首先是manifest:



    
    
    
    

    
        

        
    

可以看到,独立进程的服务在这里约定了。广播接受者在这里约定了,与服务在同一进程中。
上面app中使用的MarsServiceProxy是个什么东西呢?

public class MarsServiceProxy implements ServiceConnection {
    ......
    private MarsServiceProxy() {
        worker = new Worker();
        worker.start();
    }

    public static void init(Context context, Looper looper, String packageName) {
        if (inst != null) {
            // TODO: Already initialized
            return;
        }

        gContext = context.getApplicationContext();

        gPackageName = (packageName == null ? context.getPackageName() : packageName);
        gClassName = SERVICE_DEFUALT_CLASSNAME;

        inst = new MarsServiceProxy();
    }
    ......
    
}

其实是从ServiceConnection继承下来的服务连接对象,但是他不仅仅是个连接对象。我们看到,他是个单例,在app的SampleApplicaton的onCreate中进行的初始化:

// NOTE: MarsServiceProxy is for client/caller
        // Initialize MarsServiceProxy for local client, can be moved to other place
        MarsServiceProxy.init(this, getMainLooper(), null);

app中调用的是send这个静态方法:

public static void send(MarsTaskWrapper marsTaskWrapper) {
        inst.queue.offer(marsTaskWrapper);
    }

其实这个方法在操作的是队列LinkedBlockingQueue。看到了吧,这个MarsServiceProxy其实是个api代理,内部有缓存的任务队列,实际上send就是向这个线程安全的队列中加入一项任务MarsTaskWrapper。
暂时放一下,我们关注下他的服务功能。在构造的时候,new了一个Worker,并start了。这个worker就是一个线程:

private static class Worker extends Thread {

        @Override
        public void run() {

            while (true) {
                inst.continueProcessTaskWrappers();

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    //
                }
            }
        }
    }

也就是说,在这个类创建的时候,同时创建了一个工作线程,不断的以间隔50ms循环调用continueProcessTaskWrappers。再看continueProcessTaskWrappers:

private void continueProcessTaskWrappers() {
        try {
            if (service == null) {
                Log.d(TAG, "try to bind remote mars service, packageName: %s, className: %s", gPackageName, gClassName);
                Intent i = new Intent().setClassName(gPackageName, gClassName);
                gContext.startService(i);
                if (!gContext.bindService(i, inst, Service.BIND_AUTO_CREATE)) {
                    Log.e(TAG, "remote mars service bind failed");
                }

                // Waiting for service connected
                return;
            }

            MarsTaskWrapper taskWrapper = queue.take();
            if (taskWrapper == null) {
                // Stop, no more task
                return;
            }

            try {
                Log.d(TAG, "sending task = %s", taskWrapper);
                final String cgiPath = taskWrapper.getProperties().getString(MarsTaskProperty.OPTIONS_CGI_PATH);
                final Integer globalCmdID = GLOBAL_CMD_ID_MAP.get(cgiPath);
                if (globalCmdID != null) {
                    taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_CMD_ID, globalCmdID);
                    Log.i(TAG, "overwrite cmdID with global cmdID Map: %s -> %d", cgiPath, globalCmdID);
                }
                service.send(taskWrapper, taskWrapper.getProperties());

            } catch (Exception e) { // RemoteExceptionHandler
                e.printStackTrace();
            }
        } catch (Exception e) {

        }
    }

1.检查服务是否启动,没有则启动并返回等待下一个50ms再继续;
2.从队列中获取一个任务,并给他分配一个cmdID,然后调用MarsService的send方法执行真正的发送事件。
其实从上面看,这个服务代理就是做了这些事情,更深入的事情其实是交给了具体的服务进程来做的。这里就是个代理api。

好的,我们往下看具体的服务。
首先MarsService是个aidl的定义,不过我们从上面的这个线程循环里就可以看到,启动的服务是根据Intent i = new Intent().setClassName(gPackageName, gClassName);启动的,这个gClassName = SERVICE_DEFUALT_CLASSNAME;就是public static final String SERVICE_DEFUALT_CLASSNAME = "com.tencent.mars.sample.wrapper.service.MarsServiceNative";看到了吧,就是MarsServiceNative。
现在起进入到服务里面。

public class MarsServiceNative extends Service implements MarsService {

    private static final String TAG = "Mars.Sample.MarsServiceNative";

    private MarsServiceStub stub;
    ......
}

这里保存了一个MarsServiceStub,后面的send都是调用他来实现的,现在暂时先放下send,看下onCreate:

@Override
    public void onCreate() {
        super.onCreate();

        final MarsServiceProfile profile = gFactory.createMarsServiceProfile();
        stub = new MarsServiceStub(this, profile);

        // set callback
        AppLogic.setCallBack(stub);
        StnLogic.setCallBack(stub);
        SdtLogic.setCallBack(stub);

        // Initialize the Mars PlatformComm
        Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper()));

        // Initialize the Mars
        StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts());
        StnLogic.setShortlinkSvrAddr(profile.shortLinkPort());
        StnLogic.setClientVersion(profile.productID());
        Mars.onCreate(true);

        StnLogic.makesureLongLinkConnected();

        //
        Log.d(TAG, "mars service native created");
    }

1.创建配置信息类MarsServiceProfile;
2.new出MarsServiceStub来;
3.设置各种回调;
4.初始化Mars;
5.Mars.onCreate(true);
6.StnLogic.makesureLongLinkConnected();确认长连接。
这里开始用到了Mars了,这个才是核心,并且不在这个工程中。核心的部分我们先放下,下一篇再深入分析。
回到MarsServiceStub,看他的send方法:

@Override
    public void send(final MarsTaskWrapper taskWrapper, Bundle taskProperties) throws RemoteException {
        final StnLogic.Task _task = new StnLogic.Task(StnLogic.Task.EShort, 0, "", null);

        // Set host & cgi path
        final String host = taskProperties.getString(MarsTaskProperty.OPTIONS_HOST);
        final String cgiPath = taskProperties.getString(MarsTaskProperty.OPTIONS_CGI_PATH);
        _task.shortLinkHostList = new ArrayList<>();
        _task.shortLinkHostList.add(host);
        _task.cgi = cgiPath;

        final boolean shortSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, true);
        final boolean longSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, false);
        if (shortSupport && longSupport) {
            _task.channelSelect = StnLogic.Task.EBoth;

        } else if (shortSupport) {
            _task.channelSelect = StnLogic.Task.EShort;

        } else if (longSupport) {
            _task.channelSelect = StnLogic.Task.ELong;

        } else {
            Log.e(TAG, "invalid channel strategy");
            throw new RemoteException("Invalid Channel Strategy");
        }

        // Set cmdID if necessary
        int cmdID = taskProperties.getInt(MarsTaskProperty.OPTIONS_CMD_ID, -1);
        if (cmdID != -1) {
            _task.cmdID = cmdID;
        }

        TASK_ID_TO_WRAPPER.put(_task.taskID, taskWrapper);
        WRAPPER_TO_TASK_ID.put(taskWrapper, _task.taskID);

        // Send
        Log.i(TAG, "now start task with id %d", _task.taskID);
        StnLogic.startTask(_task);
        if (StnLogic.hasTask(_task.taskID)) {
            Log.i(TAG, "stn task started with id %d", _task.taskID);

        } else {
            Log.e(TAG, "stn task start failed with id %d", _task.taskID);
        }
    }

1.new一个StnLogic.Task;
2.设置task的参数,根据入口的Bundle;
3.2个map保存taskID与task的关系;
4.StnLogic.startTask(_task);启动任务执行;
这里的内容又深入到了Mars核心里,可以看到,关键的处理都是在Mars核心部分完成的,这里的内容甭管是服务还是什么都是在做参数的传递及关系的维护等工作。

好吧,我们倒带回来,回到MarsServiceStub,他实现了StnLogic.ICallBack这个interface。定义在mars里:

public interface ICallBack {
        /**
         * SDK要求上层做认证操作(可能新发起一个AUTH CGI)
         * @return
         */
        boolean makesureAuthed();

        /**
         * SDK要求上层做域名解析.上层可以实现传统DNS解析,或者自己实现的域名/IP映射
         * @param host
         * @return
         */
        String[] onNewDns(final String host);

        /**
         * 收到SVR PUSH下来的消息
         * @param cmdid
         * @param data
         */
        void onPush(final int cmdid, final byte[] data);

        /**
         * SDK要求上层对TASK组包
         * @param taskID    任务标识
         * @param userContext
         * @param reqBuffer 组包的BUFFER
         * @param errCode   组包的错误码
         * @return
         */
        boolean req2Buf(final int taskID, Object userContext, ByteArrayOutputStream reqBuffer, int[] errCode, int channelSelect);

        /**
         * SDK要求上层对TASK解包
         * @param taskID        任务标识
         * @param userContext
         * @param respBuffer    要解包的BUFFER
         * @param errCode       解包的错误码
         * @return  int
         */
        int buf2Resp(final int taskID, Object userContext, final byte[] respBuffer, int[] errCode, int channelSelect);

        /**
         * 任务结束回调
         * @param taskID            任务标识
         * @param userContext
         * @param errType           错误类型
         * @param errCode           错误码
         * @return
         */
        int onTaskEnd(final int taskID, Object userContext, final int errType, final int errCode);

        /**
         * 流量统计
         * @param send
         * @param recv
         */
        void trafficData(final int send, final int recv);

        /**
         * 连接状态通知
         * @param status    综合状态,即长连+短连的状态
         * @param longlinkstatus    仅长连的状态
         */
        void reportConnectInfo(int status, int longlinkstatus);

        /**
         * SDK要求上层生成长链接数据校验包,在长链接连接上之后使用,用于验证SVR身份
         * @param identifyReqBuf    校验包数据内容
         * @param hashCodeBuffer    校验包的HASH
         * @param reqRespCmdID      数据校验的CMD ID
         * @return  ECHECK_NOW(需要校验), ECHECK_NEVER(不校验), ECHECK_NEXT(下一次再询问)
         */
        int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID);

        /**
         * SDK要求上层解连接校验回包.
         * @param buffer            SVR回复的连接校验包
         * @param hashCodeBuffer    CLIENT请求的连接校验包的HASH值
         * @return
         */
        boolean onLongLinkIdentifyResp(final byte[] buffer, final byte[] hashCodeBuffer);

        /**
         * 请求做sync
         */
        void requestDoSync();
        String[] requestNetCheckShortLinkHosts();
        /**
         * 是否登录
         * @return true 登录 false 未登录
         */
        boolean isLogoned();

        void reportTaskProfile(String taskString);
    }

可以看到都是回调,通过mars的回调,MarsServiceStub接收到了taskend,并执行了:

@Override
    public int onTaskEnd(int taskID, Object userContext, int errType, int errCode) {
        final MarsTaskWrapper wrapper = TASK_ID_TO_WRAPPER.remove(taskID);
        if (wrapper == null) {
            Log.w(TAG, "stn task onTaskEnd callback may fail, null wrapper, taskID=%d", taskID);
            return 0; // TODO: ???
        }

        try {
            wrapper.onTaskEnd(errType, errCode);

        } catch (RemoteException e) {
            e.printStackTrace();

        } finally {
            WRAPPER_TO_TASK_ID.remove(wrapper); // onTaskEnd will be called only once for each task
        }

        return 0;
    }

从map中移除task,然后执行了task自己的onTaskEnd。这样我们正最初的updateConversationTopics里就可以看到后续的更新ui的代码。

下面我们要回到updateConversationTopics附近,看看NanoMarsTaskWrapper:

public abstract class NanoMarsTaskWrapper extends AbstractTaskWrapper {

private static final String TAG = "Mars.Sample.NanoMarsTaskWrapper";

protected T request;
protected R response;

public NanoMarsTaskWrapper(T req, R resp) {
    super();

    this.request = req;
    this.response = resp;
}

@Override
public byte[] req2buf() {
    try {
        onPreEncode(request);

        final byte[] flatArray = new byte[request.getSerializedSize()];
        final CodedOutputByteBufferNano output = CodedOutputByteBufferNano.newInstance(flatArray);
        request.writeTo(output);

        Log.d(TAG, "encoded request to buffer, [%s]", MemoryDump.dumpHex(flatArray));

        return flatArray;

    } catch (Exception e) {
        e.printStackTrace();
    }

    return new byte[0];
}

@Override
public int buf2resp(byte[] buf) {
    try {
        Log.d(TAG, "decode response buffer, [%s]", MemoryDump.dumpHex(buf));

        response = MessageNano.mergeFrom(response, buf);
        onPostDecode(response);
        return StnLogic.RESP_FAIL_HANDLE_NORMAL;

    } catch (Exception e) {
        Log.e(TAG, "%s", e);
    }

    return StnLogic.RESP_FAIL_HANDLE_TASK_END;
}

public abstract void onPreEncode(T request);

public abstract void onPostDecode(R response);

}

1.从AbstractTaskWrapper继承下来;
2.保存了request和response,都是MessageNano类型的(google的protobuf内的message数据类);
3.实现了2个接口,分别用来作为request转换为buf何buf转换成为response。其实就是对象转成byte[],byte转成对象;
3.在req2buf转换的过程中,调用了request的writeTo方法;
4.在buf2resp中,调用了MessageNano.mergeFrom,实际上最终也是调用了response的mergeFrom,见下:

/**
 * Parse {@code data} as a message of this type and merge it with the
 * message being built.
 */
public static final  T mergeFrom(T msg, final byte[] data)
    throws InvalidProtocolBufferNanoException {
    return mergeFrom(msg, data, 0, data.length);
}

根据上面的4点可以看到这是个实现序列化及反序列化的过程。google的开源protobuf我们不去关注,但是需要了解的是他是通过以proto为后缀名的配置文件来达到编译时即可生成类的相关代码的程度。
那么这个AbstractTaskWrapper的基类的作用又是什么呢?

public abstract class AbstractTaskWrapper extends MarsTaskWrapper.Stub {

private Bundle properties = new Bundle();

public AbstractTaskWrapper() {

    // Reflects task properties
    final TaskProperty taskProperty = this.getClass().getAnnotation(TaskProperty.class);
    if (taskProperty != null) {
        setHttpRequest(taskProperty.host(), taskProperty.path());
        setShortChannelSupport(taskProperty.shortChannelSupport());
        setLongChannelSupport(taskProperty.longChannelSupport());
        setCmdID(taskProperty.cmdID());
    }
}

@Override
public Bundle getProperties() {
    return properties;
}

@Override
public abstract void onTaskEnd(int errType, int errCode);

public AbstractTaskWrapper setHttpRequest(String host, String path) {
    properties.putString(MarsTaskProperty.OPTIONS_HOST, ("".equals(host) ? null : host));
    properties.putString(MarsTaskProperty.OPTIONS_CGI_PATH, path);

    return this;
}

public AbstractTaskWrapper setShortChannelSupport(boolean support) {
    properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, support);
    return this;
}

public AbstractTaskWrapper setLongChannelSupport(boolean support) {
    properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, support);
    return this;
}

public AbstractTaskWrapper setCmdID(int cmdID) {
    properties.putInt(MarsTaskProperty.OPTIONS_CMD_ID, cmdID);
    return this;
}

@Override
public String toString() {
    return "AbsMarsTask: " + BundleFormat.toString(properties);
}

}

很简单,就是提供了一些接口来设置传输协议类型,长短连接、http等。

综合来说,这个demo使用了独立的服务框架来进行传输的保证;使用了任务体系来承载每次传输及响应;大量的回调来监控运转过程中的各项关键点;封装了独立的jar wrapper,便于上层的更改及使用;独立的配置类引入支持http和tcp长短连接的使用;protobuf的引入极大提升序列化及反序列化的效率,并降低传输的数据大小;

这篇暂时就到这里吧,后面我们会深入分析下mars的核心部分。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66978.html

相关文章

  • 微信开源mars源码分析2—上层samples分析(续)

    摘要:本来是想直接深入到的核心层去看的,但是发现其实上面的部分还有好些没有分析到,因此回来继续分析。另外一个,是专用于统计的,我们暂时不去关注。具体的内容我会在后面的核心层分析的时候指出。准备下一篇进行的核心层分析吧。 本来是想直接深入到mars的核心层去看的,但是发现其实上面的samples部分还有好些没有分析到,因此回来继续分析。ConversationActivity这个类中实际上还做...

    MyFaith 评论0 收藏0
  • 微信开源mars源码分析5—底层核心mars分析(续2)

    摘要:执行并根据每个连接的状态决定后续处理,上篇已经讲过,不再累述。上面的三段处理完毕后,应该是数组中不再有连接才对,这里的保险处理是对数组再进行检查。至此跳出,算是整个连接过程完毕了。这里需要逐句分析,首先是。 最近回顾之前的文章,发现最后一篇有些着急了,很多地方没有叙述清楚。这里先做个衔接吧。我们还是以长连接为例,从longlink.cc看起。首先是那个线程函数__Run:/mars-m...

    asce1885 评论0 收藏0
  • 微信终端开源数据库 WCDB - Swift 版本

    摘要:作为微信的终端数据库,从开源至今,共迭代了个版本。微信也转向开发了吗相信这会是大家非常关心的问题。不仅微信,国内外大部分都还没有完全转向,但显然这是个趋势。另一方面,没有微信的上线机制的保护和庞大的用户量的验证,我们需要确保的稳定性。 WCDB 作为微信的终端数据库,从 2017.6 开源至今,共迭代了 5 个版本。我们一直关注开发者们的需求,并不断优化性能,新增如全文搜索等常用的功能...

    CloudDeveloper 评论0 收藏0

发表评论

0条评论

caiyongji

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<