Home

并发编程实践 —— 实现一个线程安全的队列

posted at 2019-07-14
Ruby的系统库中实现了队列(Queue)类。队列是一种先进先出(FIFO,First In First Out)的数据结构。

生产者与消费者(Consumers/Producers)的模型经常被拿来作为并发编程的示例,生产与消费的过程可以用队列来模拟。

Ruby的Queue的常用API有:

  • push 和它对应的别名(alias)<<enq
  • pop以及对应的别名 deq, shift

Queue与数组非常相像

例子:
> q = Queue.new
> q << 1
> q << 2
> q.pop
# 1
> q.pop
# 2


一个简单的队列实现

我们首先想好我们要实现的简单队列的"规格"(specification):

API:
  • push:接受一个参数,放入队列中
  • pop: 取出队列中的第一个元素,如果队列中没有元素,则循环等待

我们用数组表示这个队列,实现起来很简单:

class MyQueue
  def initialize
    @queue = []
  end
  def push(item)
    @queue << item
  end
	
  def pop
   	while @queue.empty?
 	  # wait for enqueueing
    end
	@queue.shift
  end
end

> q = MyQueue.new > q.push 1 > q.push 2 > q.pop # 1 > q.pop # 2 > q.pop # nil

多线程
虽然上面实现了MyQueue的接口,但是这个实现不是线程安全的。为什么呢?如何使我们自定义的Queue数据结构线程安全呢?想要解决这个问题,首先我们需要明白什么会导致线程不安全。

原子性(Atomicity)
There are only two hard things in Computer Science: cache invalidation and naming things — Phil Karlton在计算机科学中只有两个难题:缓存过期和命名 - Phil Karlton

我想补充一点:保证原子操作也是一个难题

设计系统时(比如API接口)保证的幂等性,数据库的事务,并发系统避免竞争条件的出现其实本质上都是为了保证系统的原子性。

回到我们队列的实现上来,线程安全问题出现在pop的实现:

def pop
  while @queue.empty?       # l1
    # wait for enqueueing  
  end
  @queue.shift              # l2
end


Ruby解释器将以上代码翻译成两个指令,while @queue.empty?@queue.shift运行在多线程下有可能在执行至 l1 时发生上下文切换,另一个线程在队列中加入一个元素,上下文切换回来后,此时队列不为空,但是pop会一直阻塞在 l1,l2不被执行。

互斥锁(mutual exclusion / mutex)
我们可以使用互斥锁来修复这个问题,我们把有可能出现竞争条件的代码部分叫做critical section。上面的代码中,从l1至l2部分的代码就是critical section。在系统访问这片区域时,需要使用互斥锁保证执行时不会上下文切换:

require 'thread'

class MyQueue
  def initialize
    @queue = []
    @lock = Mutex.new
  end
  def push(item)
    @queue << item
  end
	
  def pop
    @lock.synchronize do
      while @queue.empty?
        # wait for enqueueing
      end
      @queue.shift
    end
  end
end


我们可以写一个简单的程序验证一下:
require 'securerandom'

q = MyQueue.new

producers = 3.times.map do
  Thread.new do
    random = SecureRandom.hex(6)
    loop do
      puts "producing #{random}"
      q.push("item-#{random}")
      sleep 0.1
    end
  end
end

consumers = 3.times.map do
  Thread.new do
    loop do
      item = q.pop
      if item
        puts "consumed #{item}"
      else
        puts "item should not be nil"
        exit(1)
      end
    end
  end
end

producers.map(&:join)
consumers.map(&:join)



我们希望在调用pop时,如果队列内没有元素,则等待而不是返回nil。如果我们的实现不正确的话,“item should not be nil”一定会在某一刻被打印出来,运行一段时间,可以发现并没有这个问题。

性能的考量 —— 使用条件变量(Condition Variable)
从性能的角度审视这个实现,其实这个实现还有个问题,调用pop方法时,如果队列中没有元素,程序会一直执行while @queue.empty?循环,这个是对系统资源的浪费。 如果有办法让系统调度器在队列无元素时休眠,当队列中被push新元素是唤醒就好了。

Condition Variable comes to rescue!

Condition Variable(条件变量)是利用线程间共享的全局变量进行同步的一种机制,它能使线程休眠,在满足一些条件后再次唤醒休眠的线程,完美的适用于这个场景。

下面为我们的队列实现加入条件变量,push时通知休眠的线程(如果有的话),pop时如果发现队列为空,则休眠线程:
require 'thread'

class MyQueue
  def initialize
    @queue = []
    @lock = Mutex.new
    @cv = ConditionVariable.new
  end
  
  def push(item)
    @lock.synchronize do
      @queue << item
      @cv.signal
    end
  end
	
  def pop
    @lock.synchronize do
      if @queue.empty?
        @cv.wait(@lock)
      end
      @queue.shift
    end
  end
end


这里有两点需要注意:

  1. 我们希望在调用push方法时,唤醒休眠的线程,下面的代码是push方法的关键段(critical section)
@queue << item
@cv.signal


在程序执行时,可能在@queue << item后发生上下文切换,这是我们不期望的结果,因此需要用@lock.synchronize
2. cv接收mutex作为参数,因为wait()会将当前线程置为休眠状态,这个时候需要释放互斥锁,从而允许其他线程获取这个锁

看上去这个实现是完美的,运行验证代码检验一下:

不出意外,程序运行一段时间会报item should not be nil

问题出现在什么地方呢?

线程状态和虚假唤醒
线程状态和进程类似,也有ready, sleep和running这些状态。
假设有两个消费者线程Tc1,Tc2和一个生产者线程Tp1。c代表consumer,执行pop方法;p代表producer,执行push方法。

执行过程:
  1. Tc1先运行(running),这时的队列元素为空,Tc1休眠(sleep)并释放锁;
  2. 调度器切换到Tp1,push了一个元素进入队列,并且唤醒Tc1,Tc1的状态由sleep变为ready,但是Tc1没有运行
  3. Tc2 由ready变为running,由于此时队列有元素,因此pop出一个元素
  4. Tc1此时的状态由ready变为running,Tc1再次执行@queue.shift,但是因为队列中没有元素了,所以pop会返回空,这是我们希望避免的情况。

除了上面的一种情况,部分操作系统支持虚假唤醒线程,即不满足条件,也会唤醒线程。

对于这两种情况,有一个非常简单的修复方式:用while替换if:
require 'thread'

class MyQueue
  def initialize
    @queue = []
    @lock = Mutex.new
    @cv = ConditionVariable.new
  end
  
  def push(item)
    @lock.synchronize do
      @queue << item
      @cv.signal
    end
  end
	
  def pop
    @lock.synchronize do
      while @queue.empty?
        @cv.wait(@lock)
      end
      @queue.shift
    end
  end
end

如此以来,即使是一个应当处于休眠状态的线程被虚假唤醒了,只要满足@queue.empty?这个条件,线程仍然会被休眠。

总结

我们首先实现了队列的API:poppush方法; 接着,为了保证操作原子性和线程安全,我们引入互斥锁; 接着为了提升性能,引入条件变量(condition variable)用来替换busy-waiting(while循环检查数组是否为空)的形式; 最后,由于虚假唤醒和线程调度的不可控,我们使用while替换if(在使用condition variable时可以无脑使用while循环,来保证线程安全);
由此可见,并发编程确实很难,难点在于线程调度的不可控制。但是如果多加练习,还是可以总结出一些方法论,比如while和condition variable要结合使用,看到counter += 1或if a > 1 then b = 2 这样的代码就知道这是线程不安全的等等。

有时间再讲讲事件驱动的并发模型,它解决了调度的不可控的问题,但是事件驱动模型也有自身的缺点。