API Reference¶
- aio_pika.patterns.base¶
alias of <module ‘aio_pika.patterns.base’ from ‘/build/reproducible-path/python-aio-pika-9.5.6/aio_pika/patterns/base.py’>
- class aio_pika.patterns.Master(channel: AbstractChannel, requeue: bool = True, reject_on_redelivered: bool = False)[source]¶
Implements Master/Worker pattern. Usage example:
worker.py
master = Master(channel) worker = await master.create_worker('test_worker', lambda x: print(x))
master.py
master = Master(channel) await master.proxy.test_worker('foo')
Creates a new
Masterinstance.- Parameters:
channel – Initialized instance of
aio_pika.Channel
- async create_task(channel_name: str, kwargs: Mapping[str, Any] = mappingproxy({}), **message_kwargs: Any) Ack | Nack | Reject | None[source]¶
Creates a new task for the worker
- async create_worker(queue_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Worker[source]¶
Creates a new
Workerinstance.
- class aio_pika.patterns.Worker(queue: AbstractQueue, consumer_tag: str, loop: AbstractEventLoop)[source]¶
- class aio_pika.patterns.RPC(channel: AbstractChannel, host_exceptions: bool = False)[source]¶
Remote Procedure Call helper.
Create an instance
rpc = await RPC.create(channel, host_exceptions=False)
Registering python function
# RPC instance passes only keyword arguments def multiply(*, x, y): return x * y await rpc.register("multiply", multiply)
Call function through proxy
assert await rpc.proxy.multiply(x=2, y=3) == 6
Call function explicit
assert await rpc.call('multiply', dict(x=2, y=3)) == 6
Show exceptions on remote side
rpc = await RPC.create(channel, host_exceptions=True)
- async call(method_name: str, kwargs: Dict[str, Any] | None = None, *, expiration: int | None = None, priority: int = 5, delivery_mode: DeliveryMode = DeliveryMode.NOT_PERSISTENT) Any[source]¶
Call remote method and awaiting result.
- Parameters:
method_name – Name of method
kwargs – Methos kwargs
expiration – If not None messages which staying in queue longer will be returned and
asyncio.TimeoutErrorwill be raised.priority – Message priority
delivery_mode – Call message delivery mode
- Raises:
asyncio.TimeoutError – when message expired
CancelledError – when called
RPC.cancel()RuntimeError – internal error
- async classmethod create(channel: AbstractChannel, **kwargs: Any) RPC[source]¶
Creates a new instance of
aio_pika.patterns.RPC. You should use this method instead of__init__(), becausecreate()returns coroutine and makes async initialize- Parameters:
channel – initialized instance of
aio_pika.Channel- Returns:
- async execute(func: Callable[[...], Awaitable[T]], payload: Dict[str, Any]) T[source]¶
Executes rpc call. Might be overlapped.
- async register(method_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Any[source]¶
Method creates a queue with name which equal of method_name argument. Then subscribes this queue.
- Parameters:
method_name – Method name
func – target function. Function MUST accept only keyword arguments.
kwargs – arguments which will be passed to queue_declare
- Raises:
RuntimeError – Function already registered in this
RPCinstance or method_name already used.