资讯专栏INFORMATION COLUMN

用Python实现读写锁

syoya / 2997人阅读

摘要:读写锁的优先级读写锁也有分读优先和写优先。我们不需要两个相似的读写锁类。改进为了能够满足自定义优先级的读写锁,要记录等待的读写线程数,并且需要两个条件用来处理哪方优先的通知。

起步

Python 提供的多线程模型中并没有提供读写锁,读写锁相对于单纯的互斥锁,适用性更高,可以多个线程同时占用读模式的读写锁,但是只能一个线程占用写模式的读写锁

通俗点说就是当没有写锁时,就可以加读锁且任意线程可以同时加;而写锁只能有一个线程,且必须在没有读锁时才能加上。

简单的实现
import threading

class RWlock(object):
    def __init__(self):
        self._lock = threading.Lock()
        self._extra = threading.Lock()
        self.read_num = 0

    def read_acquire(self):
        with self._extra:
            self.read_num += 1
            if self.read_num == 1:
                self._lock.acquire()

    def read_release(self):
        with self._extra:
            self.read_num -= 1
            if self.read_num == 0:
                self._lock.release()

    def write_acquire(self):
        self._lock.acquire()

    def write_release(self):
        self._lock.release()

这是读写锁的一个简单的实现,self.read_num 用来保存获得读锁的线程数,这个属性属于临界区,对其操作也要加锁,所以这里需要一个保护内部数据的额外的锁 self._extra

但是这个锁是不公平的。理想情况下,线程获得所的机会应该是一样的,不管线程是读操作还是写操作。而从上述代码可以看到,读请求都会立即设置 self.read_num += 1,不管有没有获得锁,而写请求想要获得锁还得等待 read_num 为 0 。

所以这个就造成了只有锁没有被占用或者没有读请求时,可以获得写权限。我们应该想办法避免读模式锁长期占用。

读写锁的优先级

读写锁也有分 读优先写优先。上面的代码就属于读优先。

如果要改成写优先,那就换成去记录写线程的引用计数,读和写在同时竞争时,可以让写线程增加写的计数,这样可使读线程的读锁一直获取不到, 因为读线程要先判断写的引用计数,若不为0,则等待其为 0,然后进行读。这部分代码不罗列了。

但这样显然不够灵活。我们不需要两个相似的读写锁类。我们希望重构我们代码,使它更强大。

改进

为了能够满足自定义优先级的读写锁,要记录等待的读写线程数,并且需要两个条件 threading.Condition 用来处理哪方优先的通知。计数引用可以扩大语义:正数:表示正在读操作的线程数,负数:表示正在写操作的线程数(最多-1)

在获取读操作时,先然后判断时候有等待的写线程,没有,进行读操作,有,则等待读的计数加 1 后等待 Condition 通知;等待读的计数减 1,计数引用加 1,继续读操作,若条件不成立,循环等待;

在获取写操作时,若锁没有被占用,引用计数减 1,若被占用,等待写线程数加 1,等待写条件 Condition 的通知。

读模式和写模式的释放都是一样,需要根据判断去通知对应的 Condition:

class RWLock(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.rcond = threading.Condition(self.lock)
        self.wcond = threading.Condition(self.lock)
        self.read_waiter = 0    # 等待获取读锁的线程数
        self.write_waiter = 0   # 等待获取写锁的线程数
        self.state = 0          # 正数:表示正在读操作的线程数   负数:表示正在写操作的线程数(最多-1)
        self.owners = []        # 正在操作的线程id集合
        self.write_first = True # 默认写优先,False表示读优先

    def write_acquire(self, blocking=True):
        # 获取写锁只有当
        me = threading.get_ident()
        with self.lock:
            while not self._write_acquire(me):
                if not blocking:
                    return False
                self.write_waiter += 1
                self.wcond.wait()
                self.write_waiter -= 1
        return True

    def _write_acquire(self, me):
        # 获取写锁只有当锁没人占用,或者当前线程已经占用
        if self.state == 0 or (self.state < 0 and me in self.owners):
            self.state -= 1
            self.owners.append(me)
            return True
        if self.state > 0 and me in self.owners:
            raise RuntimeError("cannot recursively wrlock a rdlocked lock")
        return False

    def read_acquire(self, blocking=True):
        me = threading.get_ident()
        with self.lock:
            while not self._read_acquire(me):
                if not blocking:
                    return False
                self.read_waiter += 1
                self.rcond.wait()
                self.read_waiter -= 1
        return True

    def _read_acquire(self, me):
        if self.state < 0:
            # 如果锁被写锁占用
            return False

        if not self.write_waiter:
            ok = True
        else:
            ok = me in self.owners
        if ok or not self.write_first:
            self.state += 1
            self.owners.append(me)
            return True
        return False

    def unlock(self):
        me = threading.get_ident()
        with self.lock:
            try:
                self.owners.remove(me)
            except ValueError:
                raise RuntimeError("cannot release un-acquired lock")

            if self.state > 0:
                self.state -= 1
            else:
                self.state += 1
            if not self.state:
                if self.write_waiter and self.write_first:   # 如果有写操作在等待(默认写优先)
                    self.wcond.notify()
                elif self.read_waiter:
                    self.rcond.notify_all()
                elif self.write_waiter:
                    self.wcond.notify()

    read_release = unlock
    write_release = unlock

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/44940.html

相关文章

  • 深入理解Python中的ThreadLocal变量(上)

    摘要:我们知道多线程环境下,每一个线程均可以使用所属进程的全局变量。在线程中使用局部变量则不存在这个问题,因为每个线程的局部变量不能被其他线程访问。 我们知道多线程环境下,每一个线程均可以使用所属进程的全局变量。如果一个线程对全局变量进行了修改,将会影响到其他所有的线程。为了避免多个线程同时对变量进行修改,引入了线程同步机制,通过互斥锁,条件变量或者读写锁来控制对全局变量的访问。 只用全局变...

    huangjinnan 评论0 收藏0
  • 操作系统实战

    摘要:操作系统实战临界资源保护临界资源进行通信线程间同步互斥量和读写锁自旋锁条件变量进程间同步共享内存域套接字重要概念用户态与内核态上下文切换协程线程同步之互斥量互斥量锁可以保证多线程的指令按顺序执行,避免两个线程的指令交叉执行即原子性原子性是指 操作系统实战 临界资源 保护临界资源/进行通信 线程间同步 互斥量和 读写锁 自旋锁 条件变量 进程间同步 共享内存 域套接字 重要概念 用...

    developerworks 评论0 收藏0
  • 图解ReentrantReadWriteLock实现分析

    摘要:锁实现分析本节通过学习源码分析可重入读写锁的实现。读写锁结构分析继承于,其中主要功能均在中完成,其中最重要功能为控制线程获取锁失败后转换为等待状态及在满足一定条件后唤醒等待状态的线程。 概述 本文主要分析JCU包中读写锁接口(ReadWriteLock)的重要实现类ReentrantReadWriteLock。主要实现读共享,写互斥功能,对比单纯的互斥锁在共享资源使用场景为频繁读取及少...

    nemo 评论0 收藏0

发表评论

0条评论

syoya

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<