forked from goodboy/tractor
1
0
Fork 0

Change to multi-line-static-`dict` style msgs

Re-arranging such that element-orders are line-arranged to our new
IPC `.msg.types.Msg` fields spec in prep for replacing the current
`dict`-as-msg impls with the `msgspec.Struct` native versions!
msg_codecs
Tyler Goodlet 2024-03-28 13:08:18 -04:00
parent 456979dd12
commit 8ff18739be
2 changed files with 28 additions and 11 deletions

View File

@ -536,7 +536,9 @@ def pack_error(
# content's `.msgdata`). # content's `.msgdata`).
error_msg['tb_str'] = tb_str error_msg['tb_str'] = tb_str
pkt: dict = {'error': error_msg} pkt: dict = {
'error': error_msg,
}
if cid: if cid:
pkt['cid'] = cid pkt['cid'] = cid

View File

@ -89,7 +89,10 @@ async def _invoke_non_context(
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({
'cid': cid,
'functype': 'asyncgen',
})
# XXX: massive gotcha! If the containing scope # XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line, # is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be # any ``ActorNursery.__aexit__()`` WON'T be
@ -109,18 +112,27 @@ async def _invoke_non_context(
# to_send = await chan.recv_nowait() # to_send = await chan.recv_nowait()
# if to_send is not None: # if to_send is not None:
# to_yield = await coro.asend(to_send) # to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid}) await chan.send({
'yield': item,
'cid': cid,
})
log.runtime(f"Finished iterating {coro}") log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper # TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final # `StopAsyncIteration` system here for returning a final
# value if desired # value if desired
await chan.send({'stop': True, 'cid': cid}) await chan.send({
'stop': True,
'cid': cid,
})
# one way @stream func that gets treated like an async gen # one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen: elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({
'cid': cid,
'functype': 'asyncgen',
})
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as # manualy construct the response dict-packet-responses as
@ -133,7 +145,10 @@ async def _invoke_non_context(
if not cs.cancelled_caught: if not cs.cancelled_caught:
# task was not cancelled so we can instruct the # task was not cancelled so we can instruct the
# far end async gen to tear down # far end async gen to tear down
await chan.send({'stop': True, 'cid': cid}) await chan.send({
'stop': True,
'cid': cid
})
else: else:
# regular async function/method # regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()` # XXX: possibly just a scheduled `Actor._cancel_task()`
@ -182,10 +197,10 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
await chan.send( await chan.send({
{'return': result, 'return': result,
'cid': cid} 'cid': cid,
) })
except ( except (
BrokenPipeError, BrokenPipeError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -479,8 +494,8 @@ async def _invoke(
# "least sugary" type of RPC ep with support for # "least sugary" type of RPC ep with support for
# bi-dir streaming B) # bi-dir streaming B)
await chan.send({ await chan.send({
'cid': cid,
'functype': 'context', 'functype': 'context',
'cid': cid
}) })
# TODO: should we also use an `.open_context()` equiv # TODO: should we also use an `.open_context()` equiv