From 8ff18739be1e55deeb46f386f2e5ced6020adc51 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 28 Mar 2024 13:08:18 -0400 Subject: [PATCH] Change to multi-line-static-`dict` style msgs Re-arranging such that element-orders are line-arranged to our new IPC `.msg.types.Msg` fields spec in prep for replacing the current `dict`-as-msg impls with the `msgspec.Struct` native versions! --- tractor/_exceptions.py | 4 +++- tractor/_rpc.py | 35 +++++++++++++++++++++++++---------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 0e1d6d1..b1a8ee6 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -536,7 +536,9 @@ def pack_error( # content's `.msgdata`). error_msg['tb_str'] = tb_str - pkt: dict = {'error': error_msg} + pkt: dict = { + 'error': error_msg, + } if cid: pkt['cid'] = cid diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 91482a0..310b80a 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -89,7 +89,10 @@ async def _invoke_non_context( # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): - await chan.send({'functype': 'asyncgen', 'cid': cid}) + await chan.send({ + 'cid': cid, + 'functype': 'asyncgen', + }) # XXX: massive gotcha! If the containing scope # is cancelled and we execute the below line, # any ``ActorNursery.__aexit__()`` WON'T be @@ -109,18 +112,27 @@ async def _invoke_non_context( # to_send = await chan.recv_nowait() # if to_send is not None: # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) + await chan.send({ + 'yield': item, + 'cid': cid, + }) log.runtime(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired - await chan.send({'stop': True, 'cid': cid}) + await chan.send({ + 'stop': True, + 'cid': cid, + }) # one way @stream func that gets treated like an async gen # TODO: can we unify this with the `context=True` impl below? elif treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) + await chan.send({ + 'cid': cid, + 'functype': 'asyncgen', + }) # XXX: the async-func may spawn further tasks which push # back values like an async-generator would but must # manualy construct the response dict-packet-responses as @@ -133,7 +145,10 @@ async def _invoke_non_context( if not cs.cancelled_caught: # task was not cancelled so we can instruct the # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) + await chan.send({ + 'stop': True, + 'cid': cid + }) else: # regular async function/method # XXX: possibly just a scheduled `Actor._cancel_task()` @@ -182,10 +197,10 @@ async def _invoke_non_context( and chan.connected() ): try: - await chan.send( - {'return': result, - 'cid': cid} - ) + await chan.send({ + 'return': result, + 'cid': cid, + }) except ( BrokenPipeError, trio.BrokenResourceError, @@ -479,8 +494,8 @@ async def _invoke( # "least sugary" type of RPC ep with support for # bi-dir streaming B) await chan.send({ + 'cid': cid, 'functype': 'context', - 'cid': cid }) # TODO: should we also use an `.open_context()` equiv