性能远超chan的无锁队列

作者:leonzgshao,腾讯CSIG后台开发工程师

| 导语结合Java语言的高可用无锁队列框架Disruptor实现的高性能无锁队列,可实现远高于chan的高性能数据传递,解决高并发环境下chan写入数据慢的问题。

1. chan的困境

1.1. chan高并发挺慢

按照我之前的认知,我以为既然go提供了这么一种通信方式,那么它的性能自然一定是有保证的。事实证明,靠想、靠自以为是不靠谱的,chan并没有想象中的那么快,尤其是并发的时候。先做一个最简单的测试,并发向chan中放入数据,看下放入的时间分布:

var (
		t1_10us     = uint64(0) // 1-10微秒
		t10_100us   = uint64(0) // 10-100微秒
		t100_1000us = uint64(0) // 100-1000微秒
		t1_10ms     = uint64(0) // 1-10毫秒
		t10_100ms   = uint64(0) // 10-100毫秒
		t100_ms     = uint64(0) // 大于100毫秒
	)

	var (
		length   = 1024 * 1024
		goSize   = 100
		numPerGo = 10000
		counter  = uint64(0)
		slower   = uint64(0)
		wg       sync.WaitGroup
	)
	ch := make(chan uint64, length)
	// 消费端
	go func() {
		var ts time.Time
		var count int32
		for {
			x := <-ch
			atomic.AddInt32(&count, 1)
			if count == 1 {
				ts = time.Now()
			}
			if x%100000 == 0 {
				fmt.Printf("read %d
", x)
			}
			if count == int32(goSize*numPerGo) {
				tl := time.Since(ts)
				fmt.Printf("read time = %d ms
", tl.Milliseconds())
			}
		}
	}()
	wg.Add(goSize)
	totalS := time.Now()
	for i := 0; i < goSize; i++ {
		go func() {
			for j := 0; j < numPerGo; j++ {
				x := atomic.AddUint64(&counter, 1)
				ts := time.Now()
				ch <- x
				tl := time.Since(ts)
				ms := tl.Microseconds()
				if ms > 1 {
					atomic.AddUint64(&slower, 1)
					if ms < 10 { // t1_10us
						atomic.AddUint64(&t1_10us, 1)
					} else if ms < 100 {
						atomic.AddUint64(&t10_100us, 1)
					} else if ms < 1000 {
						atomic.AddUint64(&t100_1000us, 1)
					} else if ms < 10000 {
						atomic.AddUint64(&t1_10ms, 1)
					} else if ms < 100000 {
						atomic.AddUint64(&t10_100ms, 1)
					} else {
						atomic.AddUint64(&t100_ms, 1)
					}
				}
			}
			wg.Done()
		}()
	}
	wg.Wait()
	totalL := time.Since(totalS)
	fmt.Printf("write total time = [%d ms]
", totalL.Milliseconds())
	time.Sleep(time.Second * 3)
	fmt.Printf("slow ratio = %.2f 
", float64(slower)*100.0/float64(counter))
	fmt.Printf("quick ratio = %.2f 
", float64(goSize*numPerGo-int(slower))*100.0/float64(goSize*numPerGo))
	fmt.Printf("[<1us][%d] 
", counter-slower)
	fmt.Printf("[1-10us][%d] 
", t1_10us)
	fmt.Printf("[10-100us][%d] 
", t10_100us)
	fmt.Printf("[100-1000us][%d] 
", t100_1000us)
	fmt.Printf("[1-10ms][%d] 
", t1_10ms)
	fmt.Printf("[10-100ms][%d] 
", t10_100ms)
	fmt.Printf("[>100ms][%d] 
", t100_ms)

上述例子中,启动了100个协程,每个协程循环向chan中放入10000个对象,上面的代码在我的mac中执行结果如下:

write total time = [184 ms]
read time = 196 ms
slow ratio = 14.72 
quick ratio = 85.28 
[<1us][852773] 
[1-10us][101126] 
[10-100us][45671] 
[100-1000us][395] 
[1-10ms][19] 
[10-100ms][16] 
[>100ms][0] 

然而我们仔细的分析一下,可以看到如下两个点:

1)对象在放入chan中时,还是比较耗时的,尤其是会存在不小比例的耗时比较高的,例如上面的16个10-100ms间的操作,这种波动或者说抖动会高并发时会严重影响我们的性能,我们期望这种抖动尽可能降低;

2)上述测试的chan长度是1024x1024,实际上比放入对象的大小还大,也就是说即使chan不满,也会导致一定的慢耗时

1.2. chan结构

在排查原因前,我们先看下chan的结构,众所周知,chan的结构如下runtime.hchan

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

1.3. chan为什么这么慢

其实已经有很多文章介绍chan的原理了,我就不详细描述了,从影响性能的角度来看,简单来说有两点:

对于前者,给我们提供了一个思路,那就是由多个recv协程读取,这样可以提高放入的性能,但是具体设置多少呢?实际上业务并不好判断,因此并不是一种好的选择。

对于后者,我们先来说下这个lock,这个lock不是sync.Mutexsync.Mutex是go给我们开发者提供一个锁,这个锁的实现其实比较复杂,也有很多文章来说明,我这里就不详细说明了,但是这个锁有一个好处,它其实是一个轻量级的锁,这个轻量主要说的是这个锁是由go自身提供的,当出现锁切换时,它并不需要调用操作系统的lock来让渡出cpu资源,而仅仅是通过gopark()的方式,将目前正在执行的g放回到p中,等待其他m来调度它,它的切换相对来说没有那么耗费资源。

而hchan中的lock是一个runtime.mutex。这个锁是一个互斥锁,在linux系统中它的实现是futex,在没有竞争的情况下,会退化成为一个自旋操作,速度非常快,但是当竞争比较大时,它就会在内核中休眠。注意,此处是在内核中休眠,而与runtime.Mutex是不同的,这也是为什么chan会这么慢的原因。

2. 无锁队列

2.1. Disruptor

熟悉java语言的小伙伴应该知道有一个比较出名的高性能无锁队列框架:Disruptor,github地址:https://github.com/LMAX-Exchange/disruptor 当然go语言也有一个对应的库go-disruptor:https://github.com/smarty-prototypes/go-disruptor

但是在实际测试中发现go-disruptor其实性能比较一般,没有Java中的优化那么多,并且go-disruptor并发写入需要使用锁,非常不优化,性能比较低。因此,就萌生了写一个go版本Disruptor想法。

2.2. Lockfree

要做到无锁实现队列模型,那么能依赖的其实就只有atomic(原子操作)。本无锁队列参考了Disruptor的实现,根据go语言的特点,加入了自己的一些思考,代码已经开源,地址:https://github.com/bruceshao/lockfree 欢迎来拍。

lockfree的核心优化点包括如下几个方面,其中很多也是我们在开发高性能程序需要关注的,可以参考:

1)绝对的无锁实现:

lockfree内部几乎所有的操作都是通过原子变量(atomic)来操作,仅仅有一处使用了chan,作为队列长时间为空时,消费g阻塞使用,该chan只有在队列为空的情况下触发,所以不会影响性能。

2)单一的消费g:

消费g即队列的消费者,将消费g设置为单一g,即整个无锁队列只有一个g用于消费,这样就屏蔽掉了读操作竞争带来的性能损耗。

3)写不等待原则:

本身无锁队列的设计初衷就是写入要快,因此对于写入的操作是不会等待的,当无法写入时会持续通过自旋加任务调度的方式处理,一方面尽量加快写入效率,另一方面则是防止占用太多CPU资源。其核心处理代码:

  // 获取下一个可写入序号  
  seq := q.seqer.next()
	pos := int(seq & q.mask)
	for {
		if q.abuf.disabled(pos) {
			q.rbuf.write(pos, v)
			q.abuf.enable(pos)
			// 如果接收方阻塞则释放
			q.abuf.release()
			break
		}
		// 写操作持续等待,该等待仅会调用runtime.Gosched()进行当前g的调度让出
		loop, _ = wait(loop, WriteWaitMax)
	}

4)Pointer替代切片:

available切片用于标记ringbuffer中元素的可用状态。尽管其是一个[]uint8结构,但实际上当高并发对其进行赋值更新时,由于每次操作在其内部都会进行越界判断(通过汇编代码获得该信息),导致其寻址性能并不高。因此通过对切片结构中的Data进行unsafe.Pointer操作,提高了其可用状态调整的性能。

// enable 设置pos位置为可读状态,读线程可读取
func (a *available) enable(pos int) {
	*(*uint8)(unsafe.Pointer(uintptr(a.buf) + uintptr(pos))) = 1
}

// disable 设置pos位置为可写状态,写入线程可写入值
func (a *available) disable(pos int) {
	*(*uint8)(unsafe.Pointer(uintptr(a.buf) + uintptr(pos))) = 0
}

5)一次性内存分配:

使用环状结构Ringbuffer实现对象的传递,RingBuffer中存储对象为包含传递对象结构的结构体,可以进行一次性内存分配,提高处理的性能。

6)缓存行填充:

在计算机硬件上,为了提高效率,cpu存在多级高速缓存(通常是三级)。指令和数据会被事先加载到多级缓存中,这样cpu就不用每次与内存进行交互,从而提高效率。然而实际上不会只加载需要的数据,而是会加载需要数据的上文部分数据,因为根据程序的局部性原理,这些数据后面大概率会用到,这样就避免了再次加载,提高了效率。但是如果这样一次性加载的数据如果被多个cpu核心操作,就会涉及到一个竞争,因此每次加载和更新的数据是有冲突的(从应用程序上来看是没有冲突的),这就形成了所谓的伪共享。解决这个问题的办法就是缓存行填充,操作系统一般一次性加载的缓存行大小是64B,因此可以在其前和后各加入部分字段来解决。数据结构如下:

// cursor 游标,一直持续增长的一个uint64序列
// 该序列用于wg(Write Goroutine)获取对应写入到buffer中元素的位置操作
// 通过使用atomic操作避免锁,提高性能
// 通过使用padding填充的方式,填充前面和后面各使用7个uint64(缓存行填充),避免伪共享问题
type cursor struct {
	p1, p2, p3, p4, p5, p6, p7       uint64
	v                                uint64
	p9, p10, p11, p12, p13, p14, p15 uint64
}

7)与运算加速:

RingBuffer的容量必须设置为2的n次方,这样就可以通过与运算来代替取余运算,从而提高整体的性能。

8)泛型加速:

Go1.18版本后引入了泛型,泛型与interface有很明显的区别,从性能上来看,泛型是在编译阶段确定类型,这样可有效降低在运行时进行类型转换的耗时(经过测试,这部分还是比较耗时的)。

2.3. 核心概念

Lockfree的整体结构关系如下所示:

ringBuffer

具体对象的存放区域,通过数组(定长切片)实现环状数据结构,其中的数据对象是具体的结构体而非指针,这样可以一次性进行内存申请,避免频繁内存申请带来的系统开销。数据结构如下:

type e[T any] struct {
	val T
}

// ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构
// 其中e为具体对象,非指针,这样可以一次性进行内存申请
type ringBuffer[T any] struct {
	buf      []e[T]
	sequer   *sequencer
	capacity uint64
}

available

切片实现的map,通过index(或pos)标识每个位置为0或1,当长时间无法读取时会通过blockC进行阻塞,写线程完成时可释放该blockC。 其内部buf实际是[]uint8,但由于[]uint8切片在寻址时会进行游标是否越界的判断,造成性能下降,因此通过使用unsafe.Pointer直接对对应的值进行操作,从而避免越界判断,提升性能。之所以使用uint8数组而不是使用的bitmap,主要是考虑到写并发的行为,防止bit操作导致数据异常(或靠锁解决)。数据结构如下:

// available 切片实现的map,通过index(或pos)标识每个位置为0或1
// 当长时间无法读取时会通过blockC进行阻塞,写线程完成时可释放该blockC
// 其内部buf实际是[]uint8,但由于[]uint8切片在寻址时会进行游标是否越界的判断,造成性能下降,
// 因此通过使用unsafe.Pointer直接对对应的值进行操作,从而避免越界判断,提升性能
// 之所以使用uint8是考虑到写并发的行为,防止bit操作导致数据异常(或靠锁解决)
type available struct {
	buf    unsafe.Pointer
	blockC chan struct{}
	block  uint32
}

sequencer

序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护,读取状态由自身一个uint64变量维护。它的核心方法是next(),用于获取下个可以写入的游标。数据结构如下:

// sequencer 序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护。
// 读取状态由自身维护,变量read即可
type sequencer struct {
	wc       *cursor
	ws       waitStrategy
	rc       uint64 // 读取游标,因为该值仅会被一个g修改,所以不需要使用cursor
	capacity uint64
}

Producer

生产者,核心方法是Write,通过调用Write方法可以将对象写入到队列中。支持多个g并发操作,保证加入时处理的效率。数据结构如下:

// Producer 生产者
// 核心方法是Write,通过调用Write方法可以将对象写入到队列中
type Producer[T any] struct {
	seqer  *sequencer
	rbuf   *ringBuffer[T]
	abuf   *available
	mask   uint64
	status int32
}

consumer

消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁,对于实际的并发操作由该g进行分配。数据结构如下:

// consumer 消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁
// 对于实际的并发操作由该g进行分配
type consumer[T any] struct {
	rbuf     *ringBuffer[T]
	abuf     *available
	seqer    *sequencer
	hdl      EventHandler[T]
	parallel bool
	mask     uint64 // 用于使用&代替%(取余)运算提高性能
	status   int32  // 运行状态
}

waitStrategy

等待策略,该策略用于获取写入可用的sequence时进行的等待。默认提供了两个实现,SchedWaitStrategy和SleepWaitStrategy,前者使用runtime.Gosched(),后者使用time.Sleep()实现。 推荐使用SchedWaitStrategy,也可以自己实现。

EventHandler

事件处理器接口,整个项目中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理,它使用泛型,通过编译阶段确定事件类型,提高性能。

// EventHandler 事件处理器接口
// 整个无锁队列中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理
// 使用泛型,通过编译阶段确定事件类型,提高性能
type EventHandler[T any] interface {
	// OnEvent 用户侧实现,事件处理方法
	OnEvent(t T)
}

3. 性能对比

3.1. 写入耗时提升

整体上来看,Disruptor在写入和读取上的性能大概都在channel的7倍以上,数据写入的越多,性能提升越明显。 下面是buffer=1024x1024时,写入数据的耗时对比(可以看到写入时间有明显提升):

3.2. 性能对比

仍然以buffer大小为1024 x 1024为例,将写入时间进行分段,形成了如下的表,其中快速率描述的是写入耗时在微秒内的占比:

数据
(g*循环)

队列

快速率

<1us

1-10us

10-100us

100-1000us

1-10ms

10-100ms

>100ms

50*10000

chan

85.24%

426198

48630

24835

327

6

4

4

50*10000

lockfree

98.06%

490307

8340

1255

94

4

0

0

100*10000

chan

84.39%

843858

104287

51598

217

20

20

0

100*10000

lockfree

98.00%

980004

17513

2343

131

9

0

0

1000*10000

chan

10.07%

1007273

117192

50303

8822466

2714

39

13

1000*10000

lockfree

64.06%

6405519

23298

47347

3519377

3083

1376

0

5000*10000

chan

1.98%

990905

119376

48902

530

48835376

4889

22

5000*10000

lockfree

80.97%

40485785

30654

19052

466781

8987742

9986

0

10000*10000

chan

1.12%

1117019

76828

33322

1504

98746320

24960

47

10000*10000

lockfree

88.33%

88333884

46109

43460

630901

9701375

1244271

0

从上图中可以明显看出,lockfree比chan的性能会高很多:

最后,git地址:https://github.com/bruceshao/lockfree 欢迎小伙伴来拍、沟通和试用。

展开阅读全文

页面更新:2024-03-13

标签:队列   性能   游标   数据结构   切片   对象   状态   结构   操作   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top