资讯专栏INFORMATION COLUMN

并发模型:线程与锁

JasinYip / 2184人阅读

摘要:文章结构来自七周七并发模型互斥和内存模型创建线程这段代码创建并启动了一个实例,首先从开始,函数的余下部分一起并发执行。在锁定状态下,某些线程拥有锁在非锁定状态下,没有线程拥有它。

并发&并行

并发程序含有多个逻辑上的独立执行块,他们可以独立的并行执行,也可以串行执行。
并行程序解决问题的速度比串行程序快的多,因为其可以同时执行整个任务的多个部分。并行程序可能有多个独立执行块,也可能只有一个。

</>复制代码

  1. 引用Rob Pike的经典描述就是:
    并发是同一时间应对多件事情的能力;
    并行是同一时间动手做多件事情的能力。

常见的并发模型有:

线程与锁

函数式编程

actor模型和通信顺序是进行(Communicating Sequential Processes, CSP)

数据级并行

lambda 架构

分离标识与状态模型

这篇主要介绍线程与锁模型

线程与锁模型

线程与锁模型是对底层硬件运行过程的形式化,非常简单直接,几乎所有的编程语言都对其提供了支持,且不对其使用方法加以限制(易出错)。

</>复制代码

  1. 这篇文章主要使用python语言来演示线程与锁模型。文章结构来自《七周七并发模型》
互斥和内存模型 创建线程

</>复制代码

  1. from threading import Thread
  2. def hello_world():
  3. print("Hello from new thread")
  4. def main():
  5. my_thread = Thread(target=hello_world)
  6. my_thread.start()
  7. print("Hello from main thread")
  8. my_thread.join()
  9. main()

这段代码创建并启动了一个Thread实例,首先从start() 开始,my_thread.start() main()函数的余下部分一起并发执行。最后调用join() 来等待my_thread线程结束。

运行这段代码输出结果有几种:

</>复制代码

  1. Hello from new thread
  2. Hello from main thread

或者

</>复制代码

  1. Hello from main thread
  2. Hello from new thread

或者

</>复制代码

  1. Hello from new threadHello from main thread

究竟哪个结果取决于哪个线程先执行print()。多线程编程很难的原因之一就是运行结果可能依赖于时序,多次运行结果并不稳定。

第一把锁

</>复制代码

  1. from threading import Thread, Lock
  2. class Counter(object):
  3. def __init__(self, count=0):
  4. self.count = count
  5. def increment(self):
  6. self.count += 1
  7. def get_count(self):
  8. print("Count: %s" % self.count)
  9. return self.count
  10. def test_count():
  11. counter = Counter()
  12. class CounterThread(Thread):
  13. def run(self):
  14. for i in range(10000):
  15. counter.increment()
  16. t1 = CounterThread()
  17. t2 = CounterThread()
  18. t1.start()
  19. t2.start()
  20. t1.join()
  21. t2.join()
  22. counter.get_count()
  23. test_count()

这段代码创建一个简单的类Counter 和两个线程,每个线程都调用counter.increment() 10000次。

多次运行这段代码会得到不同的值,原因是两个线程在使用 counter.count 时发生了竞态条件(代码行为取决于各操作的时序)。

一个可能的操作是:

线程t1 获取count的值时,线程t2也同时获取到了count 的值(假设是100),

这时t1 count + 1, 此时count 为101,回写count 值,然后t2 执行了相同的操作 count+1,因为t2 取到的值也是100 此时 count 仍是101,回写后count 依然是101,但是 +1 操作执行了两次。

竞态条件的解决方法是对 count 进行同步(synchronize)访问。一种操作是使用 内置锁(也称互斥锁(mutex)、管程(monitor)或临界区(critical section))来同步对increment() 的调用。

</>复制代码

  1. 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;
    直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

    当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”。
    直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

python 锁的使用流程如下:

</>复制代码

  1. #创建锁
  2. mutex = threading.Lock()
  3. #锁定
  4. mutex.acquire([timeout])
  5. #释放
  6. mutex.release()

推荐使用上下文管理器来操作锁,

</>复制代码

  1. with lock:
  2. do someting
  3. # 相当于
  4. lock.acquire()
  5. try:
  6. # do something...
  7. finally:
  8. lock.release()

</>复制代码

  1. acquire(blocking=True, timeout=-1)
    可以阻塞或非阻塞地获得锁。
    当调用时参数 blocking 设置为 True (缺省值),阻塞直到锁被释放,然后将锁锁定并返回 True
    在参数 blocking 被设置为 False 的情况下调用,将不会发生阻塞。如果调用时 blocking 设为 True 会阻塞,并立即返回 False ;否则,将锁锁定并返回 True
    当浮点型 timeout 参数被设置为正值调用时,只要无法获得锁,将最多阻塞 timeout 设定的秒数。timeout 参数被设置为 -1 时将无限等待。当 blocking 为 false 时,timeout 指定的值将被忽略。
    如果成功获得锁,则返回 True,否则返回 False (例如发生 超时 的时候)。
    timeout 参数需要 python3.2+

</>复制代码

  1. from threading import Thread, Lock
  2. mutex = Lock()
  3. class SynchronizeCounter(object):
  4. def __init__(self, count=0):
  5. self.count = count
  6. def increment(self):
  7. # if mutex.acquire(1): # 获取锁
  8. # self.count += 1
  9. # mutex.release() # 释放锁
  10. # 等同于上述代码
  11. with mutex:
  12. self.count += 1
  13. def get_count(self):
  14. print("Count: %s" % self.count)
  15. return self.count
  16. def test_synchronize_count():
  17. counter = SynchronizeCounter()
  18. class CounterThread(Thread):
  19. def run(self):
  20. for i in range(100000):
  21. counter.increment()
  22. t1 = CounterThread()
  23. t2 = CounterThread()
  24. t1.start()
  25. t2.start()
  26. t1.join()
  27. t2.join()
  28. counter.get_count()
  29. if __name__ == "__main__":
  30. for i in range(100):
  31. test_synchronize_count()

</>复制代码

  1. 这段代码还有一个隐藏的bug,那就是 get_count(),这里get_count() 是在join()之后调用的,因此是线程安全的,但是如果在其它地方调用了 get_count() 函数。
    由于在 get_count() 中没有进行线程同步,调用时可能会获取到一个失效的值。
诡异的内存

对于JAVA等竞态编译语言,

编译器的静态优化可能会打乱代码的执行顺序

JVM 的动态优化也会打乱代码的执行顺序

硬件可以通过乱序执行来优化性能

更糟糕的是,有时一个线程产生的修改可能会对另一个线程不可见。

</>复制代码

  1. 从直觉上来说,编译器、JVM、硬件都不应插手修改原本的代码逻辑。但是近几年的运行效率提升,尤其是共享内存交媾的运行效率提升,都仰仗于此类代码优化。
    具体的副作用,Java 内存模型有明确说明。
    Java 内存模型定义了何时一个线程对内存的修改对另一个线程可见。基本原则是:如果读线程和写线程不进行同步,就不能保证可见性。
多把锁

一个重点: 两个线程都需要进行同步。只在其中一个线程进行同步是不够的。

可如果所有的方法都同步,大多数线程可能都会被阻塞,失去了并发的意义,并且可能会出现死锁。

哲学家进餐问题

</>复制代码

  1. 哲学家就餐问题:假设有五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。餐桌中间有一大碗意大利面,每两个哲学家之间有一只餐叉。因为用一只餐叉很难吃到意大利面,所以假设哲学家必须用两只餐叉吃东西。他们只能使用自己左右手边的那两只餐叉。

    哲学家从来不交谈,这就很危险,可能产生死锁,每个哲学家都拿着左手的餐叉,永远都在等右边的餐叉(或者相反)。
    即使没有死锁,也有可能发生资源耗尽。例如,假设规定当哲学家等待另一只餐叉超过五分钟后就放下自己手里的那一只餐叉,并且再等五分钟后进行下一次尝试。这个策略消除了死锁(系统总会进入到下一个状态),但仍然有可能发生“活锁”。如果五位哲学家在完全相同的时刻进入餐厅,并同时拿起左边的餐叉,那么这些哲学家就会等待五分钟,同时放下手中的餐叉,再等五分钟,又同时拿起这些餐叉。

下面是哲学家进餐问题的一个实现:

</>复制代码

  1. import threading
  2. import random
  3. import time
  4. class Philosopher(threading.Thread):
  5. running = True
  6. def __init__(self, xname, forkOnLeft, forkOnRight):
  7. threading.Thread.__init__(self)
  8. self.name = xname
  9. self.forkOnLeft = forkOnLeft
  10. self.forkOnRight = forkOnRight
  11. def run(self):
  12. while self.running:
  13. # Philosopher is thinking (but really is sleeping).
  14. time.sleep(random.uniform(1, 3))
  15. print("%s is hungry." % self.name)
  16. self.dine()
  17. def dine(self):
  18. fork1, fork2 = self.forkOnLeft, self.forkOnRight
  19. while self.running:
  20. fork1.acquire(True) # 阻塞式获取left 锁
  21. # locked = fork2.acquire(True) # 阻塞式 获取right 锁 容易产生死锁
  22. locked = fork2.acquire(False) # 非阻塞式 获取right 锁
  23. if locked:
  24. break # 如果被锁定,释放 left 退出等待
  25. fork1.release()
  26. print("%s swaps forks" % self.name)
  27. fork1, fork2 = fork2, fork1
  28. else:
  29. return
  30. self.dining()
  31. fork2.release()
  32. fork1.release()
  33. def dining(self):
  34. print("%s starts eating " % self.name)
  35. time.sleep(random.uniform(1, 5))
  36. print("%s finishes eating and leaves to think." % self.name)
  37. def DiningPhilosophers():
  38. forks = [threading.Lock() for n in range(5)]
  39. philosopherNames = ("Aristotle", "Kant", "Buddha", "Marx", "Russel")
  40. philosophers = [
  41. Philosopher(philosopherNames[i], forks[i % 5], forks[(i + 1) % 5])
  42. for i in range(5)
  43. ]
  44. Philosopher.running = True
  45. for p in philosophers:
  46. p.start()
  47. for p in philosophers:
  48. p.join()
  49. time.sleep(100)
  50. Philosopher.running = False
  51. print("Now we"re finishing.")
  52. DiningPhilosophers()
外星方法的危害

规模较大的程序常用监听器模式来解耦模块,这里我们构造一个类从一个URL进行下载,Listeners 监听下载进度。

</>复制代码

  1. import requests
  2. import threading
  3. class Listeners(object):
  4. def __init__(self, count=0):
  5. self.count = count
  6. self.done_count = 0.0
  7. self.listeners = []
  8. def append(self, listener):
  9. self.listeners.append(listener)
  10. def remove(self, listener):
  11. self.listeners.remove(listener)
  12. def on_progress(self, n):
  13. # 一些我们不知道的实现
  14. # do someting
  15. # self.done_count += 1
  16. # print("Process: %f" % (self.done_count / self.count))
  17. pass
  18. listeners = Listeners(5)
  19. class Downloader(threading.Thread):
  20. def __init__(
  21. self, group=None, target=None, name=None, args=(), kwargs=None, daemon=None
  22. ):
  23. threading.Thread.__init__(
  24. self, group=group, target=target, name=name, daemon=daemon
  25. )
  26. self.url = kwargs.get("url")
  27. def download(self):
  28. resp = requests.get(self.url)
  29. def add_listener(self, listener):
  30. listeners.append(listener)
  31. def remove_listener(self, listener):
  32. listeners.delete(listener)
  33. def update_progress(self, n):
  34. for listener in listeners:
  35. listner.on_progress(n)
  36. def run(self):
  37. self.download()
  38. print(self.url)
  39. listeners.on_progress(1)
  40. def test():
  41. urls = [
  42. "https://www.baidu.com",
  43. "https://www.google.com",
  44. "https://www.bing.com",
  45. "https://www.zaih.com",
  46. "https://www.github.com",
  47. ]
  48. ts = [Downloader(kwargs=dict(url=url)) for url in urls]
  49. print(ts)
  50. [t.start() for t in ts]
  51. [t.join() for t in ts]
  52. if __name__ == "__main__":
  53. test()

这段代码中,add_listener, remove_listener 和 update_progress 都是同步方法,但 update_progress 调用了一个我们不知道如何实现的方法。如果这个方法中,获取了一把锁,程序在执行的过程中就可能发生死锁。所以,我们要尽量避免使用这种方法。还有一种方法是在遍历之前对 listeners 进行保护性复制,再针对这份副本进行遍历。(现在调用外星方法不再需要加锁)

超越内置锁 可重入锁

Lock() 虽然方便,但限制很多:

一个线程因为等待内置锁而进入阻塞之后,就无法中断该线程

Lock() 不知道当前拥有锁的线程是否是当前线程,如果当前线程获取了锁,再次获取也会阻塞。

重入锁是(threading.RLock)一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 "所属线程" 和 "递归等级" 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。

若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

threading.RLock 提供了显式的 acquire() 和 release() 方法
一个好的实践是:

</>复制代码

  1. lock = threading.RLock()

Lock 和 RLock 的使用区别如下:

</>复制代码

  1. #rlock_tut.py
  2. import threading
  3. num = 0
  4. lock = Threading.Lock()
  5. lock.acquire()
  6. num += 1
  7. lock.acquire() # 这里会被阻塞
  8. num += 2
  9. lock.release()
  10. # With RLock, that problem doesn’t happen.
  11. lock = Threading.RLock()
  12. lock.acquire()
  13. num += 3
  14. lock.acquire() # 不会被阻塞.
  15. num += 4
  16. lock.release()
  17. lock.release() # 两个锁都需要调用 release() 来释放.
超时

使用内置锁时,阻塞的线程无法被中断,程序不能从死锁恢复,可以给锁设置超时时间来解决这个问题。

</>复制代码

  1. timeout 参数需要 python3.2+

</>复制代码

  1. import time
  2. from threading import Thread, Lock
  3. lock1 = RLock()
  4. lock2 = RLock()
  5. # 这个程序会一直死锁下去,如果想突破这个限制,可以在获取锁的时候加上超时时间
  6. # > python threading 没有实现 销毁(destroy),停止(stop),暂停(suspend),继续(resume),中断(interrupt)等
  7. class T1(Thread):
  8. def run(self):
  9. print("start run T1")
  10. lock1.acquire()
  11. # lock1.acquire(timeout=2) # 设置超时时间可避免死锁
  12. time.sleep(1)
  13. lock2.acquire()
  14. # lock2.acquire(timeout=2) # 设置超时时间可避免死锁
  15. lock1.release()
  16. lock2.release()
  17. class T2(Thread):
  18. def run(self):
  19. print("start run T2")
  20. lock2.acquire()
  21. # lock2.acquire(timeout=2) # 设置超时时间可避免死锁
  22. time.sleep(1)
  23. lock1.acquire()
  24. # lock1.acquire(timeout=2) # 设置超时时间可避免死锁
  25. lock2.release()
  26. lock1.release()
  27. def test():
  28. t1, t2 = T1(), T2()
  29. t1.start()
  30. t2.start()
  31. t1.join()
  32. t2.join()
  33. if __name__ == "__main__":
  34. test()
交替锁

如果我们要在链表中插入一个节点。一种做法是用锁保护整个链表,但链表加锁时其它使用者无法访问。交替锁可以只所追杀链表的一部分,允许不涉及被锁部分的其它线程自由访问。

</>复制代码

  1. from random import randint
  2. from threading import Thread, Lock
  3. class Node(object):
  4. def __init__(self, value, prev=None, next=None):
  5. self.value = value
  6. self.prev = prev
  7. self.next = next
  8. self.lock = Lock()
  9. class SortedList(Thread):
  10. def __init__(self, head):
  11. Thread.__init__(self)
  12. self.head = head
  13. def insert(self, value):
  14. head = self.head
  15. node = Node(value)
  16. print("insert: %d" % value)
  17. while True:
  18. if head.value <= value:
  19. if head.next != None:
  20. head = head.next
  21. else:
  22. head.lock.acquire()
  23. head.next = node
  24. node.prev = head
  25. head.lock.release()
  26. break
  27. else:
  28. prev = head.prev
  29. prev.lock.acquire()
  30. head.lock.acquire()
  31. if prev != None:
  32. prev.next = node
  33. else:
  34. self.head = node
  35. node.prev = prev
  36. prev.lock.release()
  37. node.next = head
  38. head.prev = node
  39. head.lock.release()
  40. break
  41. def run(self):
  42. for i in range(5):
  43. self.insert(randint(10, 20))
  44. def test():
  45. head = Node(10)
  46. t1 = SortedList(head)
  47. t2 = SortedList(head)
  48. t1.start()
  49. t2.start()
  50. t1.join()
  51. t2.join()
  52. while head:
  53. print(head.value)
  54. head = head.next
  55. if __name__ == "__main__":
  56. test()

这种方案不仅可以让多个线程并发的进行链表插入操作,还能让其他的链表操作安全的并发。

条件变量

并发编程经常需要等待某个事件发生。比如从队列删除元素前需要等待队列非空、向缓存添加数据前需要等待缓存有足够的空间。条件变量就是为这种情况设计的。

条件变量总是与某种类型的锁对象相关联,锁对象可以通过传入获得,或者在缺省的情况下自动创建。当多个条件变量需要共享同一个锁时,传入一个锁很有用。锁是条件对象的一部分,不必多带带地跟踪它。

条件变量服从上下文管理协议:使用 with 语句会在它包围的代码块内获取关联的锁。 acquire() 和 release() 方法也能调用关联锁的相关方法。

其它方法必须在持有关联的锁的情况下调用。 wait() 方法释放锁,然后阻塞直到其它线程调用 notify() 方法或 notify_all() 方法唤醒它。一旦被唤醒, wait() 方法重新获取锁并返回。它也可以指定超时时间。

</>复制代码

  1. #condition_tut.py
  2. import random, time
  3. from threading import Condition, Thread
  4. """
  5. "condition" variable will be used to represent the availability of a produced
  6. item.
  7. """
  8. condition = Condition()
  9. box = []
  10. def producer(box, nitems):
  11. for i in range(nitems):
  12. time.sleep(random.randrange(2, 5)) # Sleeps for some time.
  13. condition.acquire()
  14. num = random.randint(1, 10)
  15. box.append(num) # Puts an item into box for consumption.
  16. condition.notify() # Notifies the consumer about the availability.
  17. print("Produced:", num)
  18. condition.release()
  19. def consumer(box, nitems):
  20. for i in range(nitems):
  21. condition.acquire()
  22. condition.wait() # Blocks until an item is available for consumption.
  23. print("%s: Acquired: %s" % (time.ctime(), box.pop()))
  24. condition.release()
  25. threads = []
  26. """
  27. "nloops" is the number of times an item will be produced and
  28. consumed.
  29. """
  30. nloops = random.randrange(3, 6)
  31. for func in [producer, consumer]:
  32. threads.append(Thread(target=func, args=(box, nloops)))
  33. threads[-1].start() # Starts the thread.
  34. for thread in threads:
  35. """Waits for the threads to complete before moving on
  36. with the main script.
  37. """
  38. thread.join()
  39. print("All done.")
原子变量

</>复制代码

  1. 与锁相比使用原子变量的优点:

  2. 不会忘记在正确的时候获取锁

  3. 由于没有锁的参与,对原子变量的操作不会引发死锁。

  4. 原子变量时无锁(lock-free)非阻塞(non-blocking)算法的基础,这种算法可以不用锁和阻塞来达到同步的目的。

python 不支持原子变量

总结 优点

线程与锁模型最大的优点是适用面广,更接近于“本质”--近似于对硬件工作方式的形式化--正确使用时效率高。
此外,线程与锁模型也可轻松的集成到大多数编程语言。

缺点

线程与锁模型没有为并行提供直接的支持

线程与锁模型只支持共享内存模型,如果要支持分布式内存模型,就需要寻求其他技术的帮助。

用线程与锁模型编写的代码难以测试(比如死锁问题可能很久才会出现),出了问题后很难找到问题在哪,并且bug难以复现

代码难以维护(要保证所有对象的同步都是正确的、必须按 顺序来获取多把锁、持有锁时不调用外星方法。还要保证维护代码的开发者都遵守这个规则

参考链接

Let’s Synchronize Threads in Python

哲学家进餐问题

References

[1] 哲学家进餐问题: https://zh.wikipedia.org/wiki...
[2] Let’s Synchronize Threads in Python: https://hackernoon.com/synchr...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/43847.html

相关文章

  • 并发编程一

    摘要:一并发和并行并发是同一时间应对多件事情的能力并行是同一时间做多件事情的能力。用并发的目的,不仅仅是为了让程序并行运行从而发挥多核的优势。函数式编程函数式编程日渐重要的原因之一,是其对并发编程和并行编程提供了良好的支持。 一、并发和并行: 并发是同一时间应对(dealing with)多件事情的能力; 并行是同一时间做(doing)多件事情的能力。 二、并行架构: 位级并行,...

    Xufc 评论0 收藏0
  • Java 虚拟机总结给面试的你(下)

    摘要:本篇博客主要针对虚拟机的晚期编译优化,内存模型与线程,线程安全与锁优化进行总结,其余部分总结请点击虚拟总结上篇,虚拟机总结中篇。 本篇博客主要针对Java虚拟机的晚期编译优化,Java内存模型与线程,线程安全与锁优化进行总结,其余部分总结请点击Java虚拟总结上篇 ,Java虚拟机总结中篇。 一.晚期运行期优化 即时编译器JIT 即时编译器JIT的作用就是热点代码转换为平台相关的机器码...

    amc 评论0 收藏0
  • AQS同步组件--ReentrantLock与锁

    摘要:性能较好是因为避免了线程进入内核的阻塞状态请求总数同时并发执行的线程数我们首先使用声明一个所得实例,然后使用进行加锁和解锁操作。 ReentrantLock与锁 Synchronized和ReentrantLock异同 可重入性:两者都具有可重入性 锁的实现:Synchronized是依赖jvm实现的,ReentrantLock是jdk实现的。(我们可以理解为一个是操作系统层面的实现...

    dcr309duan 评论0 收藏0
  • python并发场景锁的使用方法

      小编写这篇文章的一个主要目的,主要是来给大家介绍关于python的一些事情,python的使用场景是比较的多的,主要涉及到其中的一些方方面面,那么,它的并发场景使用方法是什么呢?下面就给大家详细解答下。  前言  如果你学过操作系统,那么对于锁应该不陌生。锁的含义是线程锁,可以用来指定某一个逻辑或者是资源同一时刻只能有一个线程访问。这个很好理解,就好像是有一个房间被一把锁锁住了,只有拿到钥匙的...

    89542767 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<