ITPub博客

首页 > 应用开发 > Python > Python多进程之Process、Pool、Lock、Queue、Event、Semaphore、Pipe

Python多进程之Process、Pool、Lock、Queue、Event、Semaphore、Pipe

原创 Python 作者:ckxllf 时间:2021-03-03 16:15:28 0 删除 编辑

  1. Python创建进程类Process

  python的multiprocessing模块提供了一个创建进程的类Precess,其创建有以下两种方法:

  创建Process类的实例,并指向目标函数和传递参数

  自定义一个类并继承Process类,重写__init__()和run()方法

  Process类的构造函数如下:

  class multiprocessing.Process(group, target, name, kwargs)

  def __init__(self,

  group: Any = ...,

  target: Optional[Callable] = ...,

  name: Optional[str] = ...,

  args: Iterable[Any] = ...,

  kwargs: Mapping[Any, Any] = ...,

  *,

  daemon: Optional[bool] = ...) -> None: ...

  参数说明:

  target 表示调用对象,一般为任务函数,也可以为类;

  args 给调用对象target传递的参数,为元组

  kwargs 表示调用对象的字典

  name 为进程的别名

  group 参数不使用,可以忽略

  Process类常用的方法和属性如下

  方法

  is_alive(): 返回进程是否激活

  join([timeout]): 阻塞进程,直到进程执行完成或超时或进程被终止

  run(): 代表进程执行任务的函数,可被重写

  start(): 激活启动进程

  terminate(): 终止进程

  属性

  authkey(): 字节码,进程的准秘钥

  daemon(): 为True时,父进程终止后所有子进程自动终止,且不能产生新的进程,必须在start()方法前设置

  exitcode:退出码,进程在运行时为None,如果为-N,就表示被信号N结束

  name:获取进程名称

  pid:进程id

  1.1 我们首先使用第一种方法创建两个进程,并与单进程运行的时间做比较

  from multiprocessing import Process

  import time

  def task_process(delay):

  num = 0

  for i in range(delay * 100000000): # 1亿次数据累加计算

  num += i

  if __name__ == '__main__':

  t0 = time.time()

  task_process(3)

  task_process(3)

  t1 = time.time()

  print(f"单进程顺序执行耗时 {t1 - t0} ")

  p0 = Process(target=task_process, args=(3,))

  p1 = Process(target=task_process, args=(3,))

  t2 = time.time()

  p0.start()

  p1.start()

  p0.join()

  p1.join()

  t3 = time.time()

  print(f"多进程并发执行耗时 {t3 - t2}")

  输出结果:多进程执行相同的操作消耗的时间更少!

  单进程顺序执行耗时 32.359516859054565

  多进程并发执行耗时 17.804959535598755

  1.2 使用第二种方法,自定义一个类并继承Process类

  from multiprocessing import Process

  import time

  class MyProcess(Process):

  def __init__(self, delay):

  super().__init__()

  self.delay = delay

  # 子进程要执行的代码

  def run(self):

  num = 0

  for i in range(self.delay * 100000000):

  num += i

  if __name__ == "__main__":

  p0 = MyProcess(3)

  p1 = MyProcess(3)

  t0 = time.time()

  p0.start()

  p1.start()

  p0.join()

  p1.join()

  t1 = time.time()

  print(f"多进程并发执行耗时 {t1 - t0}")

  输出结果

  多进程并发执行耗时 16.68904447555542

  2. 进程池Pool

  进程池Pool可以提供指定数量的进程给用户使用,当有新的请求进程时,若Pool池没有满,就会创建一个新的进程用于执行该请求,如果Pool池中的进程数量已经达到最大值,则请求会等待,直到池中有进程结束才会创建新的进程。

  示例:

  # coding: utf-8

  import multiprocessing

  import time

  def task(name):

  print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")

  time.sleep(3)

  if __name__ == "__main__":

  pool = multiprocessing.Pool(processes=3)

  for i in range(10):

  # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

  pool.apply_async(func=task, args=(i,))

  pool.close()

  pool.join()

  print("hello")

  输出:

  19:43:22: 0 开始执行

  19:43:22: 1 开始执行

  19:43:22: 2 开始执行

  19:43:25: 3 开始执行

  19:43:25: 4 开始执行

  19:43:25: 5 开始执行

  19:43:28: 6 开始执行

  19:43:28: 7 开始执行

  19:43:28: 8 开始执行

  19:43:31: 9 开始执行

  hello

  3.同步机制之Lock锁

  多进程的目的是并发执行程序,提高程序执行效率,但有时候我们想要在某一时间,或者满足某一条件时,只有一个进程在执行,就需要使用Lock锁机制。

  示例:

  import multiprocessing

  import time

  def task1(lock):

  with lock: # with上下文语句使用锁,会自动释放锁

  n = 5

  while n > 1:

  print(f"{time.strftime('%H:%M:%S')} task1 输出信息")

  time.sleep(1)

  n -= 1

  def task2(lock):

  lock.acquire()

  n = 5

  while n > 1:

  print(f"{time.strftime('%H:%M:%S')} task2 输出信息")

  time.sleep(1)

  n -= 1

  lock.release()

  def task3(lock):

  lock.acquire()

  n = 5

  while n > 1:

  print(f"{time.strftime('%H:%M:%S')} task3 输出信息")

  time.sleep(1)

  n -= 1

  lock.release()

  if __name__ == "__main__":

  lock = multiprocessing.Lock()

  p1 = multiprocessing.Process(target=task1, args=(lock,))

  p2 = multiprocessing.Process(target=task2, args=(lock,))

  p3 = multiprocessing.Process(target=task3, args=(lock,))

  p1.start()

  p2.start()

  p3.start()

  输出:

  20:13:22 task1 输出信息

  20:13:23 task1 输出信息

  20:13:24 task1 输出信息

  20:13:25 task1 输出信息

  20:13:26 task2 输出信息

  20:13:27 task2 输出信息

  20:13:28 task2 输出信息

  20:13:29 task2 输出信息

  20:13:30 task3 输出信息

  20:13:31 task3 输出信息

  20:13:32 task3 输出信息

  20:13:33 task3 输出信息

  说明:从结果上看,由于锁的机制,同一时刻只有一个进程在执行。

  使用lock = multiprocess.Lock()可以得到一个锁的实例,在上面的代码中,可以使用with语句来使用锁,也可以在要执行的代码块前使用lock.acquire()方法,在执行完后再释放lock.release()锁,需要注意的是lock.acquire()后面的语句不能阻塞,否则会发生死锁的情况

  4.进程队列Queue

  Queue是python多进程的安全队列,可以使用Queue实现多进程之间的数据传递。

  Queue.put()方法说明:

  Queue.put(self, obj, block=True, timeout=None) # 用于插入数据到队列中

  当block为True,timeout为正值时,该方法会阻塞timeout指定的时间,直到该队列有剩余的空闲时间。如果超时,会抛出Queue.Full异常

  当block为False,但队列已满,会抛出Queue.Full异常

  Queue.get()方法说明:

  Queue.get(self, block=True, timeout=None) # 从队列中取出数据并删除

  当block为True,timeout为正值时,在timeout时间内没用取到数据就会抛出Queue.Empty异常

  当block为False时,有两种情况:若队列中有数据则取出;若队列为空则抛出Queue.Empty异常。

  示例:

  from multiprocessing import Process, Queue

  import time

  def ProducerA(q):

  count = 1

  while True:

  q.put(f"冷饮 {count}")

  print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")

  count += 1

  time.sleep(1)

  def ConsumerB(q):

  while True:

  print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")

  time.sleep(5)

  if __name__ == '__main__':

  q = Queue(maxsize=5)

  p = Process(target=ProducerA, args=(q,))

  c = Process(target=ConsumerB, args=(q,))

  c.start()

  p.start()

  c.join()

  p.join()

  输出:

  19:07:09 A 放入:[冷饮 1]

  19:07:09 B 取出 [冷饮 1]

  19:07:10 A 放入:[冷饮 2]

  19:07:11 A 放入:[冷饮 3]

  19:07:12 A 放入:[冷饮 4]

  19:07:13 A 放入:[冷饮 5]

  19:07:14 B 取出 [冷饮 2]

  19:07:14 A 放入:[冷饮 6]

  19:07:15 A 放入:[冷饮 7]

  19:07:19 B 取出 [冷饮 3]

  19:07:19 A 放入:[冷饮 8]

  19:07:24 B 取出 [冷饮 4]

  19:07:24 A 放入:[冷饮 9]

  19:07:29 B 取出 [冷饮 5]

  19:07:29 A 放入:[冷饮 10]

  说明:上述代码定义了生产者和消费者函数,设置队列最大容量为5,生产者调用Queue.put()方法放入数据,消费调用Queue.get()方法取出数据,当队列满时,生产者等待,当队列空时,消费者等待。生产的速度和消费的速度可能不一致,但是队列能让生产和消费有条不紊地进行。

  5.同步机制Event

  Event用来实现多进程之间的同步通信

  示例:

  import multiprocessing

  import time

  def wait_for_event(e):

  e.wait()

  time.sleep(1)

  # 唤醒后清除Event状态,为后续继续等待

  e.clear()

  print(f"{time.strftime('%H:%M:%S')} 进程 A: 我们是兄弟,我等你...")

  e.wait()

  print(f"{time.strftime('%H:%M:%S')} 进程 A: 好的,是兄弟一起走")

  def wait_for_event_timeout(e, t):

  e.wait()

  time.sleep(1)

  # 唤醒后清除Event状态,为后续继续等待

  e.clear()

  print(f"{time.strftime('%H:%M:%S')} 进程 B: 好吧,最多等你 {t} 秒")

  e.wait(t)

  print(f"{time.strftime('%H:%M:%S')} 进程 B: 我继续往前走了")

  if __name__ == "__main__":

  e = multiprocessing.Event()

  w1 = multiprocessing.Process(target=wait_for_event, args=(e,))

  w2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 5))

  w1.start()

  w2.start()

  # 主进程发话

  print(f"{time.strftime('%H:%M:%S')} 主进程: 谁等我下,我需要 8 s 时间")

  # 唤醒等待的进程

  e.set()

  time.sleep(8)

  print(f"{time.strftime('%H:%M:%S')} 主进程: 好了,我赶上了")

  # 再次唤醒等待的进程

  e.set()

  w1.join()

  w2.join()

  print(f"{time.strftime('%H:%M:%S')} 主进程:退出")

  输出:

  20:28:25 主进程: 谁等我下,我需要 8 s 时间

  20:28:26 进程 A: 我们是兄弟,我等你...

  20:28:26 进程 B: 好吧,最多等你 5 秒

  20:28:31 进程 B: 我继续往前走了

  20:28:33 主进程: 好了,我赶上了

  20:28:33 进程 A: 好的,是兄弟一起走

  20:28:33 主进程:退出

  说明:上述代码定义了两个进程函数,一个是等待事件发生,一个是设置超时时间后等待事件发生,主进程调用事件的set()方法唤醒等待事件的进程,事件被唤醒后调用clear()方法清除事件的状态并调用wait()重新等待,以此达到进程同步的控制。

  6.并发控制之Semaphore

  Semaphore是用来控制对共享资源的访问量,可以控制同一时刻进程的并发数量。

  示例:

  import multiprocessing

  import time

  def worker(s, i):

  s.acquire() # 获得锁

  print(time.strftime('%H:%M:%S'), multiprocessing.current_process().name + " 获得锁运行");

  time.sleep(i)

  print(time.strftime('%H:%M:%S'), multiprocessing.current_process().name + " 释放锁结束");

  s.release() # 释放锁

  if __name__ == "__main__":

  s = multiprocessing.Semaphore(2)

  for i in range(6):

  p = multiprocessing.Process(target=worker, args=(s, 2))

  p.start()

  输出: 山东枣庄东方妇科医院

  20:07:17 Process-1 获得锁运行

  20:07:17 Process-2 获得锁运行

  20:07:19 Process-1 释放锁结束

  20:07:19 Process-3 获得锁运行

  20:07:19 Process-2 释放锁结束

  20:07:19 Process-4 获得锁运行

  20:07:21 Process-3 释放锁结束

  20:07:21 Process-5 获得锁运行

  20:07:21 Process-4 释放锁结束

  20:07:21 Process-6 获得锁运行

  20:07:23 Process-5 释放锁结束

  20:07:23 Process-6 释放锁结束

  说明:multiprocessing.Semaphore(2)定义了同一时刻只能有2个进程在执行

  7.进程间数据传递Pipe

  multiprocessing.Pipe()方法会返回一个管道(列表的形式)的两个端口,一个端口作为输入端,一个端口作为输出端,如进程A的输出可以作为进程B的输入,进程B的输出可以作为进程A的输入,默认是全双工模式。

  Pipe()方法返回的对象具有发送消息send()方法和接收消息recv()方法。调用接收recv()方法时,如果管道中没用消息会一直阻塞,如果管道关闭,则会抛出EOFError异常。

  示例:

  import multiprocessing

  import time

  def task1(pipe):

  for i in range(5):

  str = f"task1-{i}"

  print(f"{time.strftime('%H:%M:%S')} task1 发送:{str}")

  pipe.send(str)

  time.sleep(2)

  for i in range(5):

  print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")

  def task2(pipe):

  for i in range(5):

  print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")

  time.sleep(1)

  for i in range(5):

  str = f"task2-{i}"

  print(f"{time.strftime('%H:%M:%S')} task2 发送:{str}")

  pipe.send(str)

  if __name__ == "__main__":

  pipe = multiprocessing.Pipe()

  p1 = multiprocessing.Process(target=task1, args=(pipe[0],)) # pipe[0]管道发送消息的端口

  p2 = multiprocessing.Process(target=task2, args=(pipe[1],)) # pipe[1]管道接收消息的端口

  p1.start()

  p2.start()

  p1.join()

  p2.join()

  输出:

  17:23:53 task1 发送:task1-0

  17:23:53 task1 发送:task1-1

  17:23:53 task1 发送:task1-2

  17:23:53 task1 发送:task1-3

  17:23:53 task1 发送:task1-4

  17:23:53 task2 接收: task1-0

  17:23:53 task2 接收: task1-1

  17:23:53 task2 接收: task1-2

  17:23:53 task2 接收: task1-3

  17:23:53 task2 接收: task1-4

  17:23:54 task2 发送:task2-0

  17:23:54 task2 发送:task2-1

  17:23:54 task2 发送:task2-2

  17:23:54 task2 发送:task2-3

  17:23:54 task2 发送:task2-4

  17:23:55 task1 接收: task2-0

  17:23:55 task1 接收: task2-1

  17:23:55 task1 接收: task2-2

  17:23:55 task1 接收: task2-3

  17:23:55 task1 接收: task2-4

  说明:定义了两个任务函数,task1先发5条消息,再接收消息,task2先接收消息,再发送消息。调用time.sleep()只是让输出更好看点,不会影响管道的接收和发送。

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69945560/viewspace-2760926/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2019-08-16

  • 博文量
    198
  • 访问量
    174380