We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
Aiozmq server.
import asyncio import aiozmq # type: ignore import zmq from jsonrpcserver import Result, Success, async_dispatch, method @method async def ping() -> Result: """JSON-RPC method""" return Success("pong") async def main() -> None: """Handle AioZMQ request""" rep = await aiozmq.create_zmq_stream(zmq.REP, bind="tcp://*:5000") while True: request = (await rep.read())[0].decode() if response := (await async_dispatch(request)).encode(): rep.write((response,)) if __name__ == "__main__": asyncio.set_event_loop_policy(aiozmq.ZmqEventLoopPolicy()) asyncio.get_event_loop().run_until_complete(main())