资讯专栏INFORMATION COLUMN

Python多线程同步教程

lentrue / 2640人阅读

摘要:里提供了多个用于控制多线程同步的同步原语,这些原语,包含在的标准库当中。例如总结多线程同步,说难也难,说不难也很容易,关键是要看你的业务场景和解决问题的思路,尽量降低多线程之间的依赖,理清楚业务流程,选择合适的方法,则事尽成。

概述

多线程给我们带来的好处是可以并发的执行多个任务,特别是对于I/O密集型的业务,使用多线程,可以带来成倍的性能增长。

可是当我们多个线程需要修改同一个数据,在不做任何同步控制的情况下,产生的结果往往是不可预料的,比如两个线程,一个输出hello,一个输出world,实际运行的结果,往往可能是一个是hello world,一个是world hello。

python里提供了多个用于控制多线程同步的同步原语,这些原语,包含在python的标准库threading.py当中。我今天简单的介绍一下python里的这些控制多线程同步的原语,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也可以继承这些类,实现自己的同步控制原语。

Lock(锁)

Locks是python里最简单的同步原语,只包括两个状态:locked和unlocked,刚创建时状态是unlocked。Locks有两个方法,acquire和release。acquire方法加锁,release方法释放锁,如果acquire枷锁失败,则阻塞,表明其他线程已经加锁。release方法只有当状态是locked调用方法True,如果是unlocked状态,调用release方法会抛出RunTimeError异常。例如代码:

    from threading import Lock, Thread
    lock = Lock()
    g = 0
    
    def add_one():
       """
       Just used for demonstration. It’s bad to use the ‘global’
       statement in general.
       """
       global g
       lock.acquire()
       g += 1
       lock.release()
    
    def add_two():
       global g
       lock.acquire()
       g += 2
       lock.release()
    
    threads = []
    for func in [add_one, add_two]:
       threads.append(Thread(target=func))
       threads[-1].start()
    
    for thread in threads:
       """
       Waits for threads to complete before moving on with the main
       script.
       """
       thread.join()

    print(g)

最终输出的结果是3,通过Lock的使用,虽然在两个线程中修改了同一个全局变量,但两个线程是顺序计算出结果的。

RLock(循环锁)

上面的Lock对象虽然能达到同步的效果,但是无法得知当前是那个线程获取到了锁。如果锁没被释放,则其他获取这个锁的线程都会被阻塞住。如果不想阻塞,可以使用RLock,例如:

    # 使用Lock
    import threading
    num = 0
    lock = Threading.Lock()
    
    lock.acquire()
    num += 1
    lock.acquire() # 这个地方阻塞
    num += 2
    lock.release()
    
    # 使用RLock
    lock = Threading.RLock()
    lock.acquire()
    num += 3
    lock.acquire() # 这不会阻塞
    num += 4
    lock.release()
    lock.release() # 这个地方注意是释放两次锁
Semaphores

Semaphores是个最简单的计数器,有两个方法acquire()和release(),如果有多个线程调用acquire()方法,acquire()方法会阻塞住,每当调用次acquire方法,就做一次减1操作,每当release()方法调用此次,就加1,如果最后的计数数值大于调用acquire()方法的线程数目,release()方法会抛出ValueError异常。下面是个生产者消费者的示例。

    import random, time
    from threading import BoundedSemaphore, Thread
    max_items = 5
    container = BoundedSemaphore(max_items)
    def producer(nloops):
        for i in range(nloops):
            time.sleep(random.randrange(2, 5))
            print(time.ctime(), end=": ")
            try:
                container.release()
                print("Produced an item.")
            except ValueError:
                print("Full, skipping.")
    def consumer(nloops):
        for i in range(nloops):
            time.sleep(random.randrange(2, 5))
            print(time.ctime(), end=": ")
            if container.acquire(False):
                print("Consumed an item.")
            else:
                print("Empty, skipping.")
    threads = []
    nloops = random.randrange(3, 6)
    print("Starting with %s items." % max_items)
    threads.append(Thread(target=producer, args=(nloops,)))
    threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),)))
    for thread in threads:  # Starts all the threads.
        thread.start()
    for thread in threads:  # Waits for threads to complete before moving on with the main script.
        thread.join()
    print("All done.")

threading模块还提供了一个Semaphore对象,它允许你可以任意次的调用release函数,但是最好还是使用BoundedSemaphore对象,这样在release调用次数过多时会报错,有益于查找错误。Semaphores最长用来限制资源的使用,比如最多十个进程。

Events

event可以充当多进程之间的通信工具,基于一个内部的标志,线程可以调用set()和clear()方法来操作这个标志,其他线程则阻塞在wait()函数,直到标志被设置为True。下面的代码展示了如何利用Events来追踪行为。

    import random, time
    from threading import Event, Thread
    
    event = Event()
    
    def waiter(event, nloops):
        for i in range(nloops):
        print(“%s. Waiting for the flag to be set.” % (i+1))
        event.wait() # Blocks until the flag becomes true.
        print(“Wait complete at:”, time.ctime())
        event.clear() # Resets the flag.
        print()
    
    def setter(event, nloops):
        for i in range(nloops):
        time.sleep(random.randrange(2, 5)) # Sleeps for some time.
        event.set()
    
    threads = []
    nloops = random.randrange(3, 6)
    
    threads.append(Thread(target=waiter, args=(event, nloops)))
    threads[-1].start()
    threads.append(Thread(target=setter, args=(event, nloops)))
    threads[-1].start()
    
    for thread in threads:
        thread.join()
    
    print(“All done.”)

Conditions

conditions是比events更加高级一点的同步原语,可以用户多线程间的通信和通知。比如A线程通知B线程资源已经可以被消费。其他的线程必须在调用wait()方法前调用acquire()方法。同样的,每个线程在资源使用完以后,要调用release()方法,这样其他线程就可以继续执行了。下面是使用conditions实现的一个生产者消费者的例子。

    import random, time
    from threading import Condition, Thread
    condition = Condition()
    box = []
    def producer(box, nitems):
        for i in range(nitems):
            time.sleep(random.randrange(2, 5))  # Sleeps for some time.
            condition.acquire()
            num = random.randint(1, 10)
            box.append(num)  # Puts an item into box for consumption.
            condition.notify()  # Notifies the consumer about the availability.
            print("Produced:", num)
            condition.release()
    def consumer(box, nitems):
        for i in range(nitems):
            condition.acquire()
            condition.wait()  # Blocks until an item is available for consumption.
            print("%s: Acquired: %s" % (time.ctime(), box.pop()))
            condition.release()
    threads = []
    nloops = random.randrange(3, 6)
    for func in [producer, consumer]:
        threads.append(Thread(target=func, args=(box, nloops)))
        threads[-1].start()  # Starts the thread.
    for thread in threads:
        thread.join()
    print("All done.")

conditions还有其他很多用户,比如实现一个数据流API,当数据准备好了可以通知其他线程去处理数据。

Barriers

barriers是个简单的同步原语,可以用户多个线程之间的相互等待。每个线程都调用wait()方法,然后阻塞,直到所有线程调用了wait(),然后所有线程同时开始运行。例如:

    from random import randrange
    from threading import Barrier, Thread
    from time import ctime, sleep
    
    num = 4
    b = Barrier(num)
    names = [“Harsh”, “Lokesh”, “George”, “Iqbal”]
    
    def player():
        name = names.pop()
        sleep(randrange(2, 5))
        print(“%s reached the barrier at: %s” % (name, ctime()))
        b.wait()
        
    threads = []
    print(“Race starts now…”)
    
    for i in range(num):
        threads.append(Thread(target=player))
        threads[-1].start()
    for thread in threads:
        thread.join()
    print()
    print(“Race over!”)

总结

多线程同步,说难也难,说不难也很容易,关键是要看你的业务场景和解决问题的思路,尽量降低多线程之间的依赖,理清楚业务流程,选择合适的方法,则事尽成。

转载自我的博客:捕蛇者说

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40778.html

相关文章

  • Python

    摘要:最近看前端都展开了几场而我大知乎最热语言还没有相关。有关书籍的介绍,大部分截取自是官方介绍。但从开始,标准库为我们提供了模块,它提供了和两个类,实现了对和的进一步抽象,对编写线程池进程池提供了直接的支持。 《流畅的python》阅读笔记 《流畅的python》是一本适合python进阶的书, 里面介绍的基本都是高级的python用法. 对于初学python的人来说, 基础大概也就够用了...

    dailybird 评论0 收藏0
  • python基础教程:异步IO 之 API

    摘要:具有以下基本同步原语子进程提供了通过创建和管理子进程的。虽然队列不是线程安全的,但它们被设计为专门用于代码。表示异步操作的最终结果。 Python的asyncio是使用 async/await 语法编写并发代码的标准库。通过上一节的讲解,我们了解了它不断变化的发展历史。到了Python最新稳定版 3.7 这个版本,asyncio又做了比较大的调整,把这个库的API分为了 高层级API和...

    vboy1010 评论0 收藏0
  • 明天找python工作,看看这几道Python面试题吧,Python面试题No14

    摘要:定时检测器定时拿出一部分重新的用过滤器进行检测剔除不能用的代理。重载是让类以统一的方式处理不同类型数据的一种手段。虽然在内存中存储表数据确实会提供很高的性能,但当守护进程崩溃时,所有的数据都会丢失。第1题: 如何解决验证码的问题,用什么模块,听过哪些人工打码平台? PIL、pytesser、tesseract模块 平台的话有:(打码平台特殊,不保证时效性) 云打码 挣码 斐斐打码 若快打码...

    番茄西红柿 评论0 收藏0
  • 明天找python工作,看看这几道Python面试题吧,Python面试题No14

    摘要:定时检测器定时拿出一部分重新的用过滤器进行检测剔除不能用的代理。重载是让类以统一的方式处理不同类型数据的一种手段。虽然在内存中存储表数据确实会提供很高的性能,但当守护进程崩溃时,所有的数据都会丢失。第1题: 如何解决验证码的问题,用什么模块,听过哪些人工打码平台? PIL、pytesser、tesseract模块 平台的话有:(打码平台特殊,不保证时效性) 云打码 挣码 斐斐打码 若快打码...

    李世赞 评论0 收藏0
  • Python - 收藏集 - 掘金

    摘要:首发于我的博客线程池进程池网络编程之同步异步阻塞非阻塞后端掘金本文为作者原创,转载请先与作者联系。在了解的数据结构时,容器可迭代对象迭代器使用进行并发编程篇二掘金我们今天继续深入学习。 Python 算法实战系列之栈 - 后端 - 掘金原文出处: 安生    栈(stack)又称之为堆栈是一个特殊的有序表,其插入和删除操作都在栈顶进行操作,并且按照先进后出,后进先出的规则进行运作。 如...

    546669204 评论0 收藏0

发表评论

0条评论

lentrue

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<