资讯专栏INFORMATION COLUMN

Swoole 源码分析——基础模块之 Channel 队列

txgcwm / 2750人阅读

摘要:前言内存数据结构,类似于的通道,底层基于共享内存互斥锁实现,可实现用户态的高性能内存队列。是当前队列占用的内存大小,用来指定是否使用共享内存是否使用锁是否使用通知。

前言

内存数据结构 Channel,类似于 Gochan 通道,底层基于 共享内存 + Mutex 互斥锁实现,可实现用户态的高性能内存队列。Channel 可用于多进程环境下,底层在读取写入时会自动加锁,应用层不需要担心数据同步问题。

channel 在之前的文章中出现过,当时用于 managerworker 进程之间进行通信的重要数据结构,主要用于 worker 进程通知 manager 进程重启相应 worker 进程。

channel 数据结构

channel 数据结构的属性比较多,head 是队列的头部位置,tail 是队列的尾部位置,size 是申请的队列内存大小,maxlen 是每个队列元素的大小,head_tagtail_tag 用于指定队列的头尾是否循环被重置回头部。bytes 是当前 channel 队列占用的内存大小,flag 用来指定是否使用共享内存、是否使用锁、是否使用 pipe 通知。memchannel 的内存首地址。

typedef struct _swChannel_item
{
    int length;
    char data[0];
} swChannel_item;

typedef struct _swChannel
{
    off_t head;
    off_t tail;
    size_t size;
    char head_tag;
    char tail_tag;
    int num;
    int max_num;
    /**
     * Data length, excluding structure
     */
    size_t bytes;
    int flag;
    int maxlen;
    /**
     * memory point
     */
    void *mem;
    swLock lock;
    swPipe notify_fd;
} swChannel;
channel 队列 swChannel_new 创建队列

创建队列就是根据 flags 来初始化队列的各个属性,值得注意的是 maxlen,当申请内存的时候会多申请这些内存,用来防止内存越界。

swChannel* swChannel_new(size_t size, int maxlen, int flags)
{
    assert(size >= maxlen);
    int ret;
    void *mem;

    //use shared memory
    if (flags & SW_CHAN_SHM)
    {
        mem = sw_shm_malloc(size + sizeof(swChannel) + maxlen);
    }
    else
    {
        mem = sw_malloc(size + sizeof(swChannel) + maxlen);
    }

    if (mem == NULL)
    {
        swWarn("swChannel_create: malloc(%ld) failed.", size);
        return NULL;
    }
    swChannel *object = mem;
    mem += sizeof(swChannel);

    bzero(object, sizeof(swChannel));

    //overflow space
    object->size = size;
    object->mem = mem;
    object->maxlen = maxlen;
    object->flag = flags;

    //use lock
    if (flags & SW_CHAN_LOCK)
    {
        //init lock
        if (swMutex_create(&object->lock, 1) < 0)
        {
            swWarn("mutex init failed.");
            return NULL;
        }
    }
    //use notify
    if (flags & SW_CHAN_NOTIFY)
    {
        ret = swPipeNotify_auto(&object->notify_fd, 1, 1);
        if (ret < 0)
        {
            swWarn("notify_fd init failed.");
            return NULL;
        }
    }
    return object;
}
swChannel_push 入队

入队的时候,首先要先加锁,然后调用 swChannel_in

swChannel_in 逻辑很简单,向队列的尾部推送数据,如果当前 channel 尾部被重置,head 还未被重置,就需要先判断剩余的内存是否够用。

如果当前 channel 尾部未被重置,就可以放心的追加元素,因为 object->size 和真正申请的内存之前还有 maxlen 可以富余,不必考虑内存越界的问题。

int swChannel_push(swChannel *object, void *in, int data_length)
{
    assert(object->flag & SW_CHAN_LOCK);
    object->lock.lock(&object->lock);
    int ret = swChannel_in(object, in, data_length);
    object->lock.unlock(&object->lock);
    return ret;
}

#define swChannel_full(ch) ((ch->head == ch->tail && ch->tail_tag != ch->head_tag) || (ch->bytes + sizeof(int) * ch->num == ch->size))
int swChannel_in(swChannel *object, void *in, int data_length)
{
    assert(data_length <= object->maxlen);
    if (swChannel_full(object))
    {
        return SW_ERR;
    }
    swChannel_item *item;
    int msize = sizeof(item->length) + data_length;

    if (object->tail < object->head)
    {
        //no enough memory space
        if ((object->head - object->tail) < msize)
        {
            return SW_ERR;
        }
        item = object->mem + object->tail;
        object->tail += msize;
    }
    else
    {
        item = object->mem + object->tail;
        object->tail += msize;
        if (object->tail >= object->size)
        {
            object->tail = 0;
            object->tail_tag = 1 - object->tail_tag;
        }
    }
    object->num++;
    object->bytes += data_length;
    item->length = data_length;
    memcpy(item->data, in, data_length);
    return SW_OK;
}
swChannel_push 出队

swChannel_push 出队的逻辑比较简单,获取队列头部位置,然后拷贝首部数据即可。当 head 超过 size 值,即可重置 head

int swChannel_pop(swChannel *object, void *out, int buffer_length)
{
    assert(object->flag & SW_CHAN_LOCK);
    object->lock.lock(&object->lock);
    int n = swChannel_out(object, out, buffer_length);
    object->lock.unlock(&object->lock);
    return n;
}

#define swChannel_empty(ch) (ch->num == 0)

int swChannel_out(swChannel *object, void *out, int buffer_length)
{
    if (swChannel_empty(object))
    {
        return SW_ERR;
    }

    swChannel_item *item = object->mem + object->head;
    assert(buffer_length >= item->length);
    memcpy(out, item->data, item->length);
    object->head += (item->length + sizeof(item->length));
    if (object->head >= object->size)
    {
        object->head = 0;
        object->head_tag = 1 - object->head_tag;
    }
    object->num--;
    object->bytes -= item->length;
    return item->length;
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/30852.html

相关文章

  • Swoole 源码分析——基础模块Queue队列

    摘要:消息队列的接受消息队列的接受是利用函数,其中是消息的类型,该参数会取出指定类型的消息,如果设定的是争抢模式,该值会统一为,否则该值就是消息发送目的的。环形队列的消息入队发送消息首先要确定环形队列的队尾。取模操作可以优化 前言 swoole 的底层队列有两种:进程间通信 IPC 的消息队列 swMsgQueue,与环形队列 swRingQueue。IPC 的消息队列用于 task_wor...

    jollywing 评论0 收藏0
  • Swoole笔记(一)

    摘要:修复添加超过万个以上定时器时发生崩溃的问题增加模块,下高性能序列化库修复监听端口设置无效的问题等。线程来处理网络事件轮询,读取数据。当的三次握手成功了以后,由这个线程将连接成功的消息告诉进程,再由进程转交给进程。此时进程触发事件。 本文示例代码详见:https://github.com/52fhy/swoo...。 简介 Swoole是一个PHP扩展,提供了PHP语言的异步多线程服务器...

    SHERlocked93 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    supernavy 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    ddongjian0000 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    wangdai 评论0 收藏0

发表评论

0条评论

txgcwm

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<