资讯专栏INFORMATION COLUMN

十.Go并发编程--channel使用

supernavy / 1229人阅读

摘要:比如主协程启动个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下也可轻易实现。这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期地探测管道中是否有消息出现。

一.设计原理

Go 语言中最常见的、也是经常被人提及的设计模式就是:

"不要通过共享内存来通信,我们应该使用通信来共享内存"

通过共享内存来通信是直接读取内存的数据,而通过通信来共享内存,是通过发送消息的方式来进行同步。

而通过发送消息来同步的这种方式常见的就是 Go 采用的通信顺序进程 CSP(Communication Sequential Process) 模型以及 Erlang 采用的 Actor 模型,这两种方式都是通过通信来共享内存。

如下图所示

大部分的语言采用的都是第一种方式直接去操作内存,然后通过互斥锁,CAS 等操作来保证并发安全。Go 引入了 Channel 和 Goroutine 实现 CSP 模型来解耦这个操作。

  • 优点:

    • 在 Goroutine 当中我们就不用手动去做资源的锁定与释放,同时将生产者和消费者进行了解耦,Channel 其实和消息队列很相似。
  • 缺点:

    • 由于 Channel 底层也是通过这些低级的同步原语实现的,所以性能上会差一些,如果有极高的性能要求时也可以用 sync 包中提供的低级同步原语

先入先出

目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

无锁管道

锁(Lock) 是一种常见的并发控制技术,我们一般会将锁分成乐观锁悲观锁,即乐观并发控制和悲观并发控制,无锁(lock-free)队列更准确的描述是使用乐观并发控制的队列。乐观并发控制也叫乐观锁,很多人都会误以为乐观锁是与悲观锁差不多,然而它并不是真正的锁,只是一种并发控制的思想.

乐观并发控制本质上是基于验证的协议,我们使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。

从某种程度上说,Channel 是一个用于同步和通信的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题

Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:

  1. 同步 Channel — 无缓冲区,发送方会直接将数据交给(Handoff)接收方

  2. 异步channel: 基于环形缓存的传统生产者消费者模型;

  3. chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;

二.数据结构

Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构:

type hchan struct {	qcount   uint           // 队列中元素总数量	dataqsiz uint           // 循环队列的长度	buf      unsafe.Pointer // 指向长度为 dataqsiz 的底层数组,只有在有缓冲时这个才有意义	elemsize uint16         // 能够发送和接受的元素大小	closed   uint32         // 是否关闭	elemtype *_type         // 元素的类型	sendx    uint           // 当前已发送的元素在队列当中的索引位置	recvx    uint           // 当前已接收的元素在队列当中的索引位置	recvq    waitq          // 接收 Goroutine 链表	sendq    waitq          // 发送 Goroutine 链表	lock mutex              // 互斥锁}// waitq 是一个双向链表,里面保存了 goroutinetype waitq struct {	first *sudog	last  *sudog}

如下图所示,channel 底层其实是一个循环队列

三.创建管道

Go 语言中所有 Channel 的创建都会使用 make 关键字。创建的表达式使用 make(chan T, cap) 来创建 channel.

如果不向 make 传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。

四. 发送数据

当想要向 Channel 发送数据时,就需要使用 ch <- i 语句.

在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止多个线程并发修改数据。

如果 Channel 已经关闭,那么向该 Channel 发送数据时会报 “send on closed channel” 错误并中止程序。

4.1 直接发送

如果 Channel 没有被关闭并且已经有处于读等待的 Goroutine,会取出最先陷入等待的 Goroutine 并直接向它发送数据:

直接发送的过程称为两个部分:

  1. 调用 runtime.sendDirect将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;
  2. 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;

需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine。

4.2 缓冲区

如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器。

4.3 阻塞发送

当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞,当然使用 select 关键字可以向 Channel 非阻塞地发送消息。

4.4 小结

可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:

  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;

五. 接收数据

可以使用两种不同的方式去接收 Channel 中的数据:

i <- chi, ok <- ch

5.1 直接接收

会根据缓冲区的大小分别处理不同的情况

  1. 如果 Channel 不存在缓冲区,直接从发送者那里把数据拷贝给接收变量
  2. 如果是有缓冲 channel
    • 将队列中的数据拷贝到接收方的内存地址;
    • 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;

5.2 缓冲区

当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 的索引位置中取出数据进行处理:

5.3 阻塞接收

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:

六. 关闭channel

使用 close(ch) 来关闭 channel 最后会调用 runtime 中的 closechan 方法.

  1. 关闭一个 nil 的 channel 和已关闭了的 channel 都会导致 panic
  2. 关闭 channel 后会释放所有因为 channel 而阻塞的 Goroutine

七. 使用场景

channel一般用于协程之间的通信,channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下channel也可轻易实现。

7.1 使用channel控制子协程

package mainimport (    "time"    "fmt")func Process(ch chan int) {    //Do some work...    time.Sleep(time.Second)    ch <- 1 //管道中写入一个元素表示当前协程已结束}func main() {    channels := make([]chan int, 10) //创建一个10个元素的切片,元素类型为channel    for i:= 0; i < 10; i++ {        channels[i] = make(chan int) //切片中放入一个channel        go Process(channels[i])      //启动协程,传一个管道用于通信    }    for i, ch := range channels {  //遍历切片,等待子协程结束        <-ch        fmt.Println("Routine ", i, " quit!")    }}

输出:

Routine  0  quit!Routine  1  quit!Routine  2  quit!Routine  3  quit!Routine  4  quit!Routine  5  quit!Routine  6  quit!Routine  7  quit!Routine  8  quit!Routine  9  quit!

上面程序通过创建N个channel来管理N个协程,每个协程都有一个channel用于跟父协程通信,父协程创建完所有协程后等待所有协程结束。

这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期地探测管道中是否有消息出现。

7.2 通过关闭 channel 实现一对多的通知

关闭 channel 时会释放所有阻塞的 Goroutine,所以我们就可以利用这个特性来做一对多的通知,除了一对多之外我们还用了 done 做了多对一的通知,当然多对一这种情况还是建议直接使用 WaitGroup 即可

package mainimport (	"fmt"	"time")func run(stop <-chan struct{}, done chan<- struct{}) {	// 每一秒打印一次	for {		select {		case <-stop:			fmt.Println("stop...")			// 接收到停止后,向 done 管道中发送数据,然后退出函数			done <- struct{}{}			return		// 超时1秒将输出hello		case <-time.After(time.Second):			fmt.Println("hello...")		}	}}func main() {	// 一对多,使用无缓冲通道,当关闭chan后,其他程序中接收到关闭信号后会统一执行操作	stop := make(chan struct{})	// 多对一,当关闭后,关闭一个chan, 写入一个数据到管道中	done := make(chan struct{}, 10)	for i := 0; i < 10; i++ {		go run(stop, done)	}	// 模拟超时时间	time.Sleep(5 * time.Second)	close(stop)	for i := 0; i < 10; i++ {		<-done	}}

输出:

hello...hello...hello......hello..stop...stop...stop...stop...stop...stop...stop...stop...stop...stop...

7.3 使用 channel 做异步编程

利用无缓冲channel,接收早于发送的特点,只有当数据写入后,接收才能完成实现数据一致性

package mainimport (	"fmt")// 这里只能读func read(c <-chan int) {	fmt.Println("read:", <-c)}// 这里只能写func write(c chan<- int) {	c <- 0}func main() {	c := make(chan int)	go write(c)	read(c)}

7.4 超时控制

超时控制还是建议使用 context

func run(stop <-chan struct{}, done chan<- struct{}) {	// 每一秒打印一次 hello	for {		select {		case <-stop:			fmt.Println("stop...")			done <- struct{}{}			return		case <-time.After(time.Second):			fmt.Println("hello")		}	}}

7.5 协程池

根据控制Channel的缓存大小来控制并发执行的Goroutine的最大数目

var limit = make(chan int, 3)func main() {    for _, w := range work {        go func() {            limit <- 1            w()            <-limit        }()    }    select{}}

最后一句select{}是一个空的管道选择语句,该语句会导致main线程阻塞,从而避免程序过早退出。还有for{}<-make(chan int)等诸多方法可以达到类似的效果。因为main线程被阻塞了,如果需要程序正常退出的话可以通过调用os.Exit(0)实现。

八. 参考

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/
  2. https://www.topgoer.cn/docs/gozhuanjia/chapter055.1-channel
  3. https://lailin.xyz/post/go-training-week3-channel.html
  4. https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-05-mem.html
♥永远年轻,永远热泪盈眶♥

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/124536.html

相关文章

  • js-csp 可以开始尝试了

    摘要:的用例的用法最早是语言传开来的看一下我从网上扒的代码其中符号是往当中写入数据的操作同时注意一般的位置对于来说是阻塞的由于能够处理异步操作也就是说能做到异步代码用同步写法更多的细节搜索应该就能找到除了也实现了对于的支持也就是 CSP 的用例 CSP 的用法最早是 Go 语言传开来的, 看一下我从网上扒的代码: package main import fmt func ping(pin...

    tracymac7 评论0 收藏0
  • PHP下用Swoole实现Actor并发模型

    摘要:协程与信箱得益于,我们可以基于的协程与快速实现一个信箱模式调度。样例代码比如在一个聊天室中,我们可以定义一个房间模型。 什么是Actor? Actor对于PHPer来说,可能会比较陌生,写过Java的同学会比较熟悉,Java一直都有线程的概念(虽然PHP有Pthread,但不普及),它是一种非共享内存的并发模型,每个Actor内的数据独立存在,Actor之间通过消息传递的形式进行交互调...

    GeekQiaQia 评论0 收藏0
  • PHP 协程:Go + Chan + Defer

    摘要:为语言提供了强大的协程编程模式。提供的协程语法借鉴自,在此向开发组致敬协程可以与很好地互补。并发执行使用创建协程,可以让和两个函数变成并发执行。协程需要拿到请求的结果。 Swoole4为PHP语言提供了强大的CSP协程编程模式。底层提供了3个关键词,可以方便地实现各类功能。 Swoole4提供的PHP协程语法借鉴自Golang,在此向GO开发组致敬 PHP+Swoole协程可以与...

    nidaye 评论0 收藏0
  • Go语言学习】2019-04-24 协程初步讨论与简单扩展

    摘要:它避免了上下文切换的额外耗费,兼顾了多线程的优点,简化了高并发程序的复杂。而可以理解为一种语言的协程。线程轻量级进程,,是程序执行流的最小单元。一个标准的线程由线程,当前指令指针,寄存器集合和堆栈组成。其实就是或者等语言中的多线程开发。 grape 全部视频:https://segmentfault.com/a/11... 原视频地址:https://biglive.xueersi.c...

    SnaiLiu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<