Flatten out RPC loop with `match:`/`case:`

Mainly expanding out the runtime endpoints for cancellation to separate
cases and flattening them with the main RPC-request-invoke block, moving
the non-cancel runtime case (where we call `getattr(actor, funcname)`)
inside the main `Start` case (for now) which branches on `ns=="self"`.

Also, add a new IPC msg `class CancelAck(Return):` which is always
included in the default msg-spec such that runtime cancellation (and
eventually all) endpoints return that msg (instead of a `Return`) and
thus sidestep any currently applied `MsgCodec` such that the results
(`bool`s for most cancel methods) are never violating the current type
limit(s) on `Msg.pld`. To support this expose a new variable
`return_msg: Return|CancelAck` param from
`_invoke()`/`_invoke_non_context)()` and set it to `CancelAck` in the
appropriate endpoint case-blocks of the msg loop.

Clean out all the lingering legacy `chan.send(<dict-msg>)` commented
codez from the invoker funcs, with more cleaning likely to come B)
runtime_to_msgspec
Tyler Goodlet 2024-04-07 10:40:01 -04:00
parent b9a61ded0a
commit aca6503fcd
2 changed files with 119 additions and 193 deletions

View File

@ -61,13 +61,15 @@ from .devx import (
from . import _state from . import _state
from .log import get_logger from .log import get_logger
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck,
Error,
Msg,
Return,
Start, Start,
StartAck, StartAck,
Started, Started,
Stop, Stop,
Yield, Yield,
Return,
Error,
) )
@ -89,6 +91,7 @@ async def _invoke_non_context(
treat_as_gen: bool, treat_as_gen: bool,
is_rpc: bool, is_rpc: bool,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -97,7 +100,6 @@ async def _invoke_non_context(
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
# await chan.send({
await chan.send( await chan.send(
StartAck( StartAck(
cid=cid, cid=cid,
@ -123,11 +125,6 @@ async def _invoke_non_context(
# to_send = await chan.recv_nowait() # to_send = await chan.recv_nowait()
# if to_send is not None: # if to_send is not None:
# to_yield = await coro.asend(to_send) # to_yield = await coro.asend(to_send)
# await chan.send({
# # Yield()
# 'cid': cid,
# 'yield': item,
# })
await chan.send( await chan.send(
Yield( Yield(
cid=cid, cid=cid,
@ -142,11 +139,6 @@ async def _invoke_non_context(
await chan.send( await chan.send(
Stop(cid=cid) Stop(cid=cid)
) )
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
# one way @stream func that gets treated like an async gen # one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
@ -157,11 +149,6 @@ async def _invoke_non_context(
functype='asyncgen', functype='asyncgen',
) )
) )
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncgen',
# })
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as # manualy construct the response dict-packet-responses as
@ -177,11 +164,6 @@ async def _invoke_non_context(
await chan.send( await chan.send(
Stop(cid=cid) Stop(cid=cid)
) )
# await chan.send({
# # Stop(
# 'cid': cid,
# 'stop': True,
# })
else: else:
# regular async function/method # regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()` # XXX: possibly just a scheduled `Actor._cancel_task()`
@ -199,11 +181,6 @@ async def _invoke_non_context(
functype='asyncfunc', functype='asyncfunc',
) )
) )
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'asyncfunc',
# })
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -237,13 +214,8 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': result,
# })
await chan.send( await chan.send(
Return( return_msg(
cid=cid, cid=cid,
pld=result, pld=result,
) )
@ -408,6 +380,7 @@ async def _invoke(
is_rpc: bool = True, is_rpc: bool = True,
hide_tb: bool = True, hide_tb: bool = True,
return_msg: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
Context | BaseException Context | BaseException
@ -517,6 +490,7 @@ async def _invoke(
kwargs, kwargs,
treat_as_gen, treat_as_gen,
is_rpc, is_rpc,
return_msg,
task_status, task_status,
) )
# below is only for `@context` funcs # below is only for `@context` funcs
@ -547,11 +521,6 @@ async def _invoke(
functype='context', functype='context',
) )
) )
# await chan.send({
# # StartAck()
# 'cid': cid,
# 'functype': 'context',
# })
# TODO: should we also use an `.open_context()` equiv # TODO: should we also use an `.open_context()` equiv
# for this callee side by factoring the impl from # for this callee side by factoring the impl from
@ -576,16 +545,11 @@ async def _invoke(
# deliver final result to caller side. # deliver final result to caller side.
await chan.send( await chan.send(
Return( return_msg(
cid=cid, cid=cid,
pld=res, pld=res,
) )
) )
# await chan.send({
# # Return()
# 'cid': cid,
# 'return': res,
# })
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -674,7 +638,6 @@ async def _invoke(
ctxc = ContextCancelled( ctxc = ContextCancelled(
msg, msg,
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
# boxed_type_str='Cancelled',
canceller=canceller, canceller=canceller,
) )
# assign local error so that the `.outcome` # assign local error so that the `.outcome`
@ -775,12 +738,12 @@ async def try_ship_error_to_remote(
trio.BrokenResourceError, trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
): ):
# err_msg: dict = msg['error']['tb_str']
log.critical( log.critical(
'IPC transport failure -> ' 'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n' f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n' f'X=> {channel.uid}\n\n'
# f'{err_msg}\n'
# TODO: use `.msg.preetty_struct` for this!
f'{msg}\n' f'{msg}\n'
) )
@ -822,6 +785,8 @@ async def process_messages(
''' '''
assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
# https://github.com/python-trio/trio/issues/467 # https://github.com/python-trio/trio/issues/467
@ -831,7 +796,7 @@ async def process_messages(
f'|_{chan}\n' f'|_{chan}\n'
) )
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: dict | None = None msg: Msg|None = None
try: try:
# NOTE: this internal scope allows for keeping this # NOTE: this internal scope allows for keeping this
# message loop running despite the current task having # message loop running despite the current task having
@ -840,6 +805,7 @@ async def process_messages(
# using ``scope = Nursery.start()`` # using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs: with CancelScope(shield=shield) as loop_cs:
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
log.transport( # type: ignore log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n' f'<= IPC msg from peer: {chan.uid}\n\n'
@ -894,21 +860,18 @@ async def process_messages(
# cid, # cid,
# channel, # channel,
# requesting_uid=channel.uid, # requesting_uid=channel.uid,
# ipc_msg=msg, # ipc_msg=msg,
# ) # )
# # immediately break out of this loop! # # immediately break out of this loop!
# break # break
# cid = msg.get('cid')
# if cid:
case ( case (
StartAck(cid=cid) StartAck(cid=cid)
| Started(cid=cid) | Started(cid=cid)
| Yield(cid=cid) | Yield(cid=cid)
| Stop(cid=cid) | Stop(cid=cid)
| Return(cid=cid) | Return(cid=cid)
| CancelAck(cid=cid)
| Error(cid=cid) | Error(cid=cid)
): ):
# deliver response to local caller/waiter # deliver response to local caller/waiter
@ -918,17 +881,85 @@ async def process_messages(
cid, cid,
msg, msg,
) )
# TODO: can remove right?
# continue
# runtime-internal cancellation endpoints
case Start(
ns='self',
func='cancel',
cid=cid,
kwargs=kwargs,
):
kwargs |= {'req_chan': chan}
# XXX NOTE XXX don't start entire actor
# runtime cancellation if this actor is
# currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
actor.cancel,
kwargs,
is_rpc=False,
return_msg=CancelAck,
)
log.runtime( log.runtime(
'Waiting on next IPC msg from\n' 'Cancelling IPC transport msg-loop with peer:\n'
f'peer: {chan.uid}:\n'
f'|_{chan}\n' f'|_{chan}\n'
# f'last msg: {msg}\n'
) )
continue loop_cs.cancel()
break
# process a 'cmd' request-msg upack case Start(
ns='self',
func='_cancel_task',
cid=cid,
kwargs=kwargs,
):
target_cid: str = kwargs['cid']
kwargs |= {
'requesting_uid': chan.uid,
'ipc_msg': msg,
# XXX NOTE! ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
}
try:
await _invoke(
actor,
cid,
chan,
actor._cancel_task,
kwargs,
is_rpc=False,
return_msg=CancelAck,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
# the "MAIN" RPC endpoint to schedule-a-`trio.Task`
#
# TODO: impl with native `msgspec.Struct` support !! # TODO: impl with native `msgspec.Struct` support !!
# -[ ] implement with ``match:`` syntax? # -[ ] implement with ``match:`` syntax?
# -[ ] discard un-authed msgs as per, # -[ ] discard un-authed msgs as per,
@ -940,139 +971,29 @@ async def process_messages(
kwargs=kwargs, # type-spec this? see `msg.types` kwargs=kwargs, # type-spec this? see `msg.types`
uid=actorid, uid=actorid,
): ):
# try:
# (
# ns,
# funcname,
# kwargs,
# actorid,
# cid,
# ) = msg['cmd']
# # TODO: put in `case Error():` right?
# except KeyError:
# # This is the non-rpc error case, that is, an
# # error **not** raised inside a call to ``_invoke()``
# # (i.e. no cid was provided in the msg - see above).
# # Push this error to all local channel consumers
# # (normally portals) by marking the channel as errored
# assert chan.uid
# exc = unpack_error(msg, chan=chan)
# chan._exc = exc
# raise exc
log.runtime( log.runtime(
'Handling RPC `Start` request from\n' 'Handling RPC `Start` request from\n'
f'peer: {actorid}\n' f'peer: {actorid}\n'
'\n' '\n'
f'=> {ns}.{funcname}({kwargs})\n' f'=> {ns}.{funcname}({kwargs})\n'
) )
# case Start(
# ns='self', # runtime-internal endpoint: `Actor.<funcname>`
# funcname='cancel', # only registry methods exist now yah,
# ): # like ``.register_actor()`` etc. ?
if ns == 'self': if ns == 'self':
if funcname == 'cancel': func: Callable = getattr(actor, funcname)
func: Callable = actor.cancel
kwargs |= {
'req_chan': chan,
}
# don't start entire actor runtime cancellation # application RPC endpoint
# if this actor is currently in debug mode!
pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# Either of `Actor.cancel()`/`.cancel_soon()`
# was called, so terminate this IPC msg
# loop, exit back out into `async_main()`,
# and immediately start the core runtime
# machinery shutdown!
with CancelScope(shield=True):
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
log.runtime(
'Cancelling IPC transport msg-loop with peer:\n'
f'|_{chan}\n'
)
loop_cs.cancel()
break
# case Start(
# ns='self',
# funcname='_cancel_task',
# ):
if funcname == '_cancel_task':
func: Callable = actor._cancel_task
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
target_cid: str = kwargs['cid']
kwargs |= {
# NOTE: ONLY the rpc-task-owning
# parent IPC channel should be able to
# cancel it!
'parent_chan': chan,
'requesting_uid': chan.uid,
'ipc_msg': msg,
}
# TODO: remove? already have emit in meth.
# log.runtime(
# f'Rx RPC task cancel request\n'
# f'<= canceller: {chan.uid}\n'
# f' |_{chan}\n\n'
# f'=> {actor}\n'
# f' |_cid: {target_cid}\n'
# )
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception(
'Failed to cancel task?\n'
f'<= canceller: {chan.uid}\n'
f' |_{chan}\n\n'
f'=> {actor}\n'
f' |_cid: {target_cid}\n'
)
continue
# case Start(
# ns='self',
# funcname='register_actor',
# ):
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname)
# case Start(
# ns=str(),
# funcname=funcname,
# ):
else: else:
# complain to client about restricted modules
try: try:
func = actor._get_rpc_func(ns, funcname) func: Callable = actor._get_rpc_func(ns, funcname)
except ( except (
ModuleNotExposed, ModuleNotExposed,
AttributeError, AttributeError,
) as err: ) as err:
# always complain to requester
# client about un-enabled modules
err_msg: dict[str, dict] = pack_error( err_msg: dict[str, dict] = pack_error(
err, err,
cid=cid, cid=cid,
@ -1082,6 +1003,7 @@ async def process_messages(
# schedule a task for the requested RPC function # schedule a task for the requested RPC function
# in the actor's main "service nursery". # in the actor's main "service nursery".
#
# TODO: possibly a service-tn per IPC channel for # TODO: possibly a service-tn per IPC channel for
# supervision isolation? would avoid having to # supervision isolation? would avoid having to
# manage RPC tasks individually in `._rpc_tasks` # manage RPC tasks individually in `._rpc_tasks`
@ -1090,7 +1012,7 @@ async def process_messages(
f'Spawning task for RPC request\n' f'Spawning task for RPC request\n'
f'<= caller: {chan.uid}\n' f'<= caller: {chan.uid}\n'
f' |_{chan}\n\n' f' |_{chan}\n\n'
# TODO: maddr style repr? # ^-TODO-^ maddr style repr?
# f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
# f'cid="{cid[-16:]} .."\n\n' # f'cid="{cid[-16:]} .."\n\n'
@ -1098,7 +1020,6 @@ async def process_messages(
f' |_cid: {cid}\n' f' |_cid: {cid}\n'
f' |>> {func}()\n' f' |>> {func}()\n'
) )
assert actor._service_n # wait why? do it at top?
try: try:
ctx: Context = await actor._service_n.start( ctx: Context = await actor._service_n.start(
partial( partial(
@ -1128,13 +1049,12 @@ async def process_messages(
log.warning( log.warning(
'Task for RPC failed?' 'Task for RPC failed?'
f'|_ {func}()\n\n' f'|_ {func}()\n\n'
f'{err}' f'{err}'
) )
continue continue
else: else:
# mark that we have ongoing rpc tasks # mark our global state with ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event() actor._ongoing_rpc_tasks = trio.Event()
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
@ -1145,23 +1065,24 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
case Error()|_: case Error() | _:
# This is the non-rpc error case, that is, an # NOTE: this is the non-rpc error case,
# error **not** raised inside a call to ``_invoke()`` # that is, an error **not** raised inside
# (i.e. no cid was provided in the msg - see above). # a call to ``_invoke()`` (i.e. no cid was
# Push this error to all local channel consumers # provided in the msg - see above). Push
# (normally portals) by marking the channel as errored # this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
log.exception( log.exception(
f'Unhandled IPC msg:\n\n' f'Unhandled IPC msg:\n\n'
f'{msg}\n' f'{msg}\n'
) )
assert chan.uid # assert chan.uid
exc = unpack_error( chan._exc: Exception = unpack_error(
msg, msg,
chan=chan, chan=chan,
) )
chan._exc = exc raise chan._exc
raise exc
log.runtime( log.runtime(
'Waiting on next IPC msg from\n' 'Waiting on next IPC msg from\n'
@ -1172,7 +1093,8 @@ async def process_messages(
# end of async for, channel disconnect vis # end of async for, channel disconnect vis
# ``trio.EndOfChannel`` # ``trio.EndOfChannel``
log.runtime( log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks" f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
f'|_{chan}\n'
) )
await actor.cancel_rpc_tasks( await actor.cancel_rpc_tasks(
req_uid=actor.uid, req_uid=actor.uid,

View File

@ -454,6 +454,10 @@ _runtime_msgs: list[Msg] = [
# emission from `MsgStream.aclose()` # emission from `MsgStream.aclose()`
Stop, Stop,
# `Return` sub-type that we always accept from
# runtime-internal cancel endpoints
CancelAck,
# box remote errors, normally subtypes # box remote errors, normally subtypes
# of `RemoteActorError`. # of `RemoteActorError`.
Error, Error,