摘要:分布式进程在和中,应当优选,因为更稳定,而且,可以分布到多台机器上,而最多只能分布到同一台机器的多个上。由于模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
分布式进程
在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的 multiprocessing 模块不但支持多进程, 其中 managers 子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。
BaseManager: 提供了不同机器进程之间共享数据的一种方法;
(重要的点: ip:port)
# task_master.py import random from multiprocessing import freeze_support from queue import Queue from multiprocessing.managers import BaseManager # 1. 创建需要的队列 # task_queue:发送任务的队列 # coding=utf-8 import random,time from queue import Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support task_queue = Queue() # 发送任务的队列: result_queue = Queue() # 接收结果的队列: class QueueManager(BaseManager): # 从BaseManager继承的QueueManager: pass # windows下运行 def return_task_queue(): global task_queue return task_queue # 返回发送任务队列 def return_result_queue (): global result_queue return result_queue # 返回接收结果队列 def test(): # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象 #QueueManager.register("get_task_queue", callable=lambda: task_queue) #QueueManager.register("get_result_queue", callable=lambda: result_queue) QueueManager.register("get_task_queue", callable=return_task_queue) QueueManager.register("get_result_queue", callable=return_result_queue) # 绑定端口4000, 设置验证码"sheenstar": #manager = QueueManager(address=("", 4000), authkey=b"sheenstar") # windows需要写ip地址 manager = QueueManager(address=("192.168.1.160", 4000), authkey=b"sheenstar") manager.start() # 启动Queue: # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() for i in range(13): # 放几个任务进去: n = random.randint(0, 10000) print("Put task %d..." % n) task.put(n) # 从result队列读取结果: print("Try get results...") for i in range(13): r = result.get(timeout=10) print("Result: %s" % r) # 关闭: manager.shutdown() print("master exit.") if __name__=="__main__": freeze_support() print("start!") test()
运行程序,会等待执行结果10s,如果没有worker端获取任务,返回结果,程序将报错。
当我们在一台机器上写多进程程序时,创建的 Queue 可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的 task_queue 进行操作,那样就绕过了QueueManager 的封装,必须通过manager.get_task_queue()获得的 Queue 接口添加。
# coding=utf-8 import time, sys from queue import Queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register("get_task_queue") QueueManager.register("get_result_queue") # 连接到服务器,也就是运行task_master.py的机器: server_addr = "192.168.1.160" print("Connect to server %s..." % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 4000), authkey=b"sheenstar") # 从网络连接: try: m.connect() except: print("请先启动task_master.py!") #sys.exit("sorry, goodbye!"); # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(13): try: n = task.get() print("run task %d * %d..." % (n, n)) r = "%d * %d = %d" % (n, n, n*n) time.sleep(1) result.put(r) except ConnectionResetError as e: print("任务执行结束,自动断开连接") # 处理结束: print("worker exit.")
使用命令行运行程序,结果更直观
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/42481.html
摘要:在一个进程内部,要同时干多件事,就需要同时运行多个子任务,我们把进程内的这些子任务称为线程。总结一下,多任务的实现方式有三种多进程模式多线程模式多进程多线程模式线程是最小的执行单元,而进程由至少一个线程组成。 进程与线程 很多同学都听说过,现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持多任务的操作系统。 什么叫多任务呢?简单地说,就是操作系统可以同时...
摘要:协程实现连接在网络通信中,每个连接都必须创建新线程或进程来处理,否则,单线程在处理连接的过程中,无法接受其他客户端的连接。所以我们尝试使用协程来实现服务器对多个客户端的响应。 协程实现TCP连接 在网络通信中,每个连接都必须创建新线程(或进程) 来处理,否则,单线程在处理连接的过程中, 无法接受其他客户端的连接。所以我们尝试使用协程来实现服务器对多个客户端的响应。与单一TCP通信的构架...
摘要:多进程执行任务结束,创建进程和销毁进程是时间的,如果长度不够,会造成多线程快过多进程多线程执行任务结束,进程间通信生产者消费者模型与队列演示了生产者和消费者的场景。 进程 Python是运行在解释器中的语言,查找资料知道,python中有一个全局锁(GIL),在使用多进程(Thread)的情况下,不能发挥多核的优势。而使用多进程(Multiprocess),则可以发挥多核的优势真正地提...
摘要:一个包来了之后,到底是交给浏览器还是,就需要端口号来区分。每个网络程序都向操作系统申请唯一的端口号,这样,两个进程在两台计算机之间建立网络连接就需要各自的地址和各自的端口号。 网络通信的三要素 IP 通信的时候, 双方必须知道对方的标识, 好比发邮件必须知道对方的邮件地址。 互联网上每个计算机的唯一标识就是IP地址, 类似 123.123.123.123 。 IP地址实际上是一个32位...
摘要:我们来编写一个简单的服务器程序,它接收客户端连接,回复客户端发来的请求。如果一切顺利,新浪的服务器接受了我们的连接,一个连接就建立起来的,后面的通信就是发送网页内容了。 TCP TCP(Transmission Control Protocol 传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议,由IETF的RFC 793定义。在简化的计算机网络OSI模型中,它完成第四...
阅读 3191·2021-11-24 10:30
阅读 1314·2021-09-30 09:56
阅读 2388·2021-09-07 10:20
阅读 2599·2021-08-27 13:10
阅读 700·2019-08-30 11:11
阅读 2052·2019-08-29 12:13
阅读 759·2019-08-26 12:24
阅读 2897·2019-08-26 12:20