类 | 说明 |
Queue([maxsize]) | 创建共享的进程队列,maxsize是队列中允许的最大项,省略则队列无大小限制 |
方法 | 说明 |
q.cancel_join_thread() | 不会在进程退出时自动连接后台线程,可防止join_thread()方法阻塞 |
q.join_thread() | 连接队列的后台线程。此方法用于q.close()方法之后,等待所有队列项被消耗完。调用q.cancel_join_thread()方法可以禁止这种行为 |
q.close() | 关闭队列。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据 |
q.empty() | 如调用此方法时q为空,返回True。此方法返回结果不可靠,在返回和使用结果之间,队列有可能已经加入新的项 |
q.full() | 如调用此方法时q已满,返回Ture。同样此方法返回结果不可靠 |
q.get([block [, timeout]]) | 返回q中的一项,如果q为空,此方法将阻塞,直到队列中有项可用为止。block用于控制阻塞行为,默认为True。如果设为False,将引发Queue.Empty异常,timeout为可选超时时间 |
q.get_nowait() | 等同于 q.get(False)方法 |
q.put(item [, block [, timeout]]) | 将item放入队列中,如果队列已满,此方法将阻塞至有可用空间为止。block控制阻塞行为,默认为True。如果设置为False,将会引发Queue.Full异常。timeout指定在阻塞模式中等待可用空间的时间长短,超时后将引发Queue.Full异常 |
q.put_nowait(item) | 等同于q.put(item, False)方法 |
q.qsize() | 返回队列中项的数量,结果不可靠 |
类 | 说明 |
JoinableQueue([maxsize]) | 创建可连接的共享进程队列,类似Queue的一个对象,但它允许项的消费者通知生产者项已被成功处理 |
方法 | 说明 |
q.task_done() | 消费者使用此方法发出信号,表示q.get()返回项已经被成功处理,如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常 |
q.join() | 生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均被调用q.task_done()方法为止 |
import multiprocessing
def consumer(input_q):
""" 消费者
"""
while True:
item = input_q.get()
# 处理项
print(f'处理input_q: {item}')
# 发出信号通知任务完成
input_q.task_done()
def producer(sequence, output_q):
""" 生产者
"""
for item in sequence:
output_q.put(item)
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
# 运行消费者进程
cons_p = multiprocessing.Process(target=consumer, args=(q,))
cons_p.daemon = True
cons_p.start()
# 运行生成者进程
sequence = (1,2,3,4)
producer(sequence, q)
# 等待所有项被处理完成
q.join()
from msilib import sequence
import multiprocessing
def consumer(input_q):
""" 消费者
"""
while True:
item = input_q.get()
# 处理项
print(f'处理input_q: {item}')
# 发出信号通知任务完成
input_q.task_done()
def producer(sequence, output_q):
""" 生产者
"""
for item in sequence:
output_q.put(item)
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
# 多个进程运行消费者模型
cons_p_1 = multiprocessing.Process(target=consumer, args=(q, ))
# 进程1处理项
cons_p_1.daemon = True
cons_p_1.start()
cons_p_2 = multiprocessing.Process(target=consumer, args=(q, ))
# 进程2处理项
cons_p_2.daemon = True
cons_p_2.start()
# 运行生产者
sequence = (1,2,3,4)
producer(sequence, q)
# 等待所有项被处理
q.join()
from ast import arg
from audioop import mul
from msilib import sequence
import multiprocessing
import sqlite3
from tkinter.messagebox import NO
def consumer(input_q):
while True:
item = input_q.get()
# 处理哨兵
if item is None:
break
# 处理项
print(f'处理input_q: {item}')
# 关闭
print('consumer 处理完成!')
def producer(sequence, output_q):
for item in sequence:
output_q.put(item)
if __name__ == '__main__':
q = multiprocessing.Queue()
# 启动消费者进程
cons_p = multiprocessing.Process(target=consumer, args=(q, ))
cons_p.start()
# 生产者
sequence = (1,2,3,4)
producer(sequence, q)
# 在队列上安置哨并
q.put(None)
# 等待消费者进程关闭
cons_p.join()
注意: 如果使用哨兵,一定要在队列上为每个消费都都安置哨兵,才能保证所有消费者进程都能关闭。
类 | 说明 |
Pipe([duplex]) | 在进程之间创建一条管道,并返回元组(conn1, conn2),其中conn1和conn2表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法 |
方法 | 说明 |
c.close() | 关闭连接。如果c被垃圾回收,将自动调用此方法 |
c.fileno() | 返回连接使用的整数文件描述符 |
c.pull([timeout]) | 如果连接上的数据可用,返回True,timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达 |
c.recv() | 接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常 |
c.recv_bytes([maxlength]) | 接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常 |
c.recv_bytes_into(buffer [, offset]) | 接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常 |
c.send(obj) | 通过连接发送对象。obj是与序列化兼容的任意对象 |
c.send_bytes(buffer [, offset [, size]]) | 通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 |
import multiprocessing
def consumer(pipe):
output_p, input_p = pipe
input_p.close() # 关闭管道的输入端
while True:
try:
item = output_p.recv()
except EOFError:
break
# TODO 可替换为有用的工作
print(item)
# 关闭
print('consumer done.')
def producer(sequence, input_p):
for item in sequence:
input_p.send(item)
if __name__ == '__main__':
# 创建管道
(output_p, input_p) = multiprocessing.Pipe()
# 启动消费者进程
cons_p = multiprocessing.Process(target=consumer, args=((output_p, input_p),))
cons_p.start()
# 关闭生产者中的输出管道
output_p.close()
# 生产项
sequence = (1,2,3,4)
producer(sequence, input_p)
# 半闭输入管道,表示输入完成
input_p.close()
# 等待消费者进程结束
cons_p.join()
说明:如果生产者或者消费者中都没有使用管道的某个端点,就就将其关闭。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点
import multiprocessing
from unittest import result
def adder(pipe):
server_p, client_p = pipe
client_p.close()
while True:
try:
x,y = server_p.recv()
except EOFError:
break
result = x + y # TODO 可用实例业务代替
server_p.send(result)
# 关闭
print('Server done.')
if __name__ == '__main__':
# 生成管道
(server_p, client_p) = multiprocessing.Pipe()
# 启动服务器进程
adder_p = multiprocessing.Process(target=adder, args=((server_p, client_p),))
adder_p.start()
# 关闭客户端中的服务器管道
server_p.close()
# 客户端发出请求
client_p.send((3,4))
print(client_p.recv())
client_p.send(('Hello', 'World'))
print(client_p.recv())
# 完成,关闭管道
client_p.close()
# 等待消费者进程完成
adder_p.join()
页面更新:2024-04-19
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号