WIP final impl of ctx-cancellation-semantics

modden_spawn_from_client_req
Tyler Goodlet 2024-02-22 18:33:18 -05:00
parent fc72d75061
commit ad5eee5666
3 changed files with 378 additions and 172 deletions

View File

@ -47,6 +47,7 @@ import trio
# maybe_wait_for_debugger, # maybe_wait_for_debugger,
# pause, # pause,
# ) # )
from .msg import NamespacePath
from ._exceptions import ( from ._exceptions import (
# _raise_from_no_key_in_msg, # _raise_from_no_key_in_msg,
unpack_error, unpack_error,
@ -71,12 +72,23 @@ log = get_logger(__name__)
async def _drain_to_final_msg( async def _drain_to_final_msg(
ctx: Context, ctx: Context,
) -> list[dict]:
# ) -> tuple[ msg_limit: int = 6,
# Any|Exception,
# list[dict], ) -> list[dict]:
# ]: '''
Drain IPC msgs delivered to the underlying rx-mem-chan
`Context._recv_chan` from the runtime in search for a final
result or error msg.
The motivation here is to ideally capture errors during ctxc
conditions where a canc-request/or local error is sent but the
local task also excepts and enters the
`Portal.open_context().__aexit__()` block wherein we prefer to
capture and raise any remote error or ctxc-ack as part of the
`ctx.result()` cleanup and teardown sequence.
'''
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
@ -88,14 +100,14 @@ async def _drain_to_final_msg(
# NOTE: this REPL usage actually works here dawg! Bo # NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause # from .devx._debug import pause
# await pause() # await pause()
# if re := ctx._remote_error: if re := ctx._remote_error:
# ctx._maybe_raise_remote_err( ctx._maybe_raise_remote_err(
# re, re,
# # NOTE: obvi we don't care if we # NOTE: obvi we don't care if we
# # overran the far end if we're already # overran the far end if we're already
# # waiting on a final result (msg). # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun, raise_overrun_from_self=raise_overrun,
# ) )
# TODO: bad idea? # TODO: bad idea?
# with trio.CancelScope() as res_cs: # with trio.CancelScope() as res_cs:
@ -108,7 +120,7 @@ async def _drain_to_final_msg(
msg: dict = await ctx._recv_chan.receive() msg: dict = await ctx._recv_chan.receive()
ctx._result: Any = msg['return'] ctx._result: Any = msg['return']
log.runtime( log.runtime(
'Context delivered final result msg:\n' 'Context delivered final draining msg:\n'
f'{pformat(msg)}' f'{pformat(msg)}'
) )
pre_result_drained.append(msg) pre_result_drained.append(msg)
@ -142,7 +154,45 @@ async def _drain_to_final_msg(
if 'yield' in msg: if 'yield' in msg:
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
log.warning(f'Discarding std "yield"\n{msg}') # and report per local context state.
if (
(ctx._stream.closed
and (reason := 'stream was already closed')
)
or (ctx._cancel_called
and (reason := 'ctx called `.cancel()`')
)
or (ctx._cancelled_caught
and (reason := 'ctx caught a cancel')
)
or (len(pre_result_drained) > msg_limit
and (reason := f'"yield" limit={msg_limit}')
)
):
log.cancel(
'Cancelling `MsgStream` drain since '
f'{reason}\n\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
return pre_result_drained
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
else:
log.warning(
'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
pre_result_drained.append(msg) pre_result_drained.append(msg)
continue continue
@ -153,8 +203,8 @@ async def _drain_to_final_msg(
# right? # right?
elif 'stop' in msg: elif 'stop' in msg:
log.cancel( log.cancel(
'Remote stream terminated due to "stop" msg:\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{msg}' f'{pformat(msg)}\n'
) )
pre_result_drained.append(msg) pre_result_drained.append(msg)
continue continue
@ -260,12 +310,14 @@ class Context:
''' '''
chan: Channel chan: Channel
cid: str # "context id", more or less a unique linked-task-pair id cid: str # "context id", more or less a unique linked-task-pair id
# the "feeder" channels for delivering message values to the # the "feeder" channels for delivering message values to the
# local task from the runtime's msg processing loop. # local task from the runtime's msg processing loop.
_recv_chan: trio.MemoryReceiveChannel _recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel _send_chan: trio.MemorySendChannel
# full "namespace-path" to target RPC function
_nsf: NamespacePath
# the "invocation type" of the far end task-entry-point # the "invocation type" of the far end task-entry-point
# function, normally matching a logic block inside # function, normally matching a logic block inside
# `._runtime.invoke()`. # `._runtime.invoke()`.
@ -281,6 +333,7 @@ class Context:
# which is exactly the primitive that allows for # which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC. # cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None _scope: trio.CancelScope | None = None
_task: trio.lowlevel.Task|None = None
# _res_scope: trio.CancelScope|None = None # _res_scope: trio.CancelScope|None = None
# on a clean exit there should be a final value # on a clean exit there should be a final value
@ -384,6 +437,7 @@ class Context:
# init and streaming state # init and streaming state
_started_called: bool = False _started_called: bool = False
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None
# overrun handling machinery # overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote # NOTE: none of this provides "backpressure" to the remote
@ -577,13 +631,14 @@ class Context:
''' '''
side: str = self.side side: str = self.side
log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}'
)
# await pause()
self._cancel_called: bool = True self._cancel_called: bool = True
header: str = f'Cancelling "{side.upper()}"-side of ctx with peer\n'
reminfo: str = (
f'uid: {self.chan.uid}\n'
f' |_ {self._nsf}()\n'
)
# caller side who entered `Portal.open_context()` # caller side who entered `Portal.open_context()`
# NOTE: on the call side we never manually call # NOTE: on the call side we never manually call
# `._scope.cancel()` since we expect the eventual # `._scope.cancel()` since we expect the eventual
@ -601,8 +656,9 @@ class Context:
with trio.move_on_after(timeout) as cs: with trio.move_on_after(timeout) as cs:
cs.shield = True cs.shield = True
log.cancel( log.cancel(
f'Cancelling stream {cid} to ' header
f'{self._portal.channel.uid}' +
reminfo
) )
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
@ -621,13 +677,13 @@ class Context:
# if not self._portal.channel.connected(): # if not self._portal.channel.connected():
if not self.chan.connected(): if not self.chan.connected():
log.cancel( log.cancel(
'May have failed to cancel remote task ' 'May have failed to cancel remote task?\n'
f'{cid} for {self._portal.channel.uid}' f'{reminfo}'
) )
else: else:
log.cancel( log.cancel(
'Timed out on cancel request of remote task ' 'Timed out on cancel request of remote task?\n'
f'{cid} for {self._portal.channel.uid}' f'{reminfo}'
) )
# callee side remote task # callee side remote task
@ -635,6 +691,11 @@ class Context:
# the caller expects a `ContextCancelled` to be sent from # the caller expects a `ContextCancelled` to be sent from
# `._runtime._invoke()` back to the other side. # `._runtime._invoke()` back to the other side.
else: else:
log.cancel(
header
+
reminfo
)
# TODO: should we have an explicit cancel message # TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an # or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough? # {'error': trio.Cancelled, cid: "blah"} enough?
@ -720,8 +781,9 @@ class Context:
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.
ctx: Context = actor.get_context( ctx: Context = actor.get_context(
self.chan, chan=self.chan,
self.cid, cid=self.cid,
nsf=self._nsf,
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
@ -735,7 +797,7 @@ class Context:
if ctx._recv_chan._closed: if ctx._recv_chan._closed:
raise trio.ClosedResourceError( raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?' 'The underlying channel for this stream was already closed!\n'
) )
# NOTE: implicitly this will call `MsgStream.aclose()` on # NOTE: implicitly this will call `MsgStream.aclose()` on
@ -764,6 +826,7 @@ class Context:
try: try:
self._stream_opened: bool = True self._stream_opened: bool = True
self._stream = stream
# XXX: do we need this? # XXX: do we need this?
# ensure we aren't cancelled before yielding the stream # ensure we aren't cancelled before yielding the stream
@ -1174,35 +1237,47 @@ class Context:
self, self,
msg: dict, msg: dict,
# draining: bool = False,
) -> bool: ) -> bool:
''' '''
Deliver an IPC msg received from a transport-channel to Deliver an IPC msg received from a transport-channel to
this context's underlying mem chan for handling by this context's underlying mem chan for handling by local
user operating tasks; deliver a bool indicating whether the user application tasks; deliver `bool` indicating whether
msg was immediately sent. the msg was able to be delivered.
If `._allow_overruns == True` (maybe) append the msg to an If `._allow_overruns == True` (maybe) append the msg to an
"overflow queue" and start a "drainer task" (inside the "overflow queue" and start a "drainer task" (inside the
`._scope_nursery: trio.Nursery`) which ensures that such `._scope_nursery: trio.Nursery`) which ensures that such
messages are eventually sent if possible. messages are queued up and eventually sent if possible.
''' '''
cid: str = self.cid cid: str = self.cid
chan: Channel = self.chan chan: Channel = self.chan
from_uid: tuple[str, str] = chan.uid from_uid: tuple[str, str] = chan.uid
send_chan: trio.MemorySendChannel = self._send_chan send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf
re: Exception|None
if re := unpack_error( if re := unpack_error(
msg, msg,
self.chan, self.chan,
): ):
log.error( log.error(
f'Delivering error-msg from {from_uid} to caller {cid}' f'Delivering error-msg to caller\n'
f'{re}' f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> cid: {cid}\n'
f' |_{self._task}\n\n'
f'{pformat(re)}\n'
) )
self._cancel_msg = msg self._cancel_msg: dict = msg
# NOTE: this will not raise an error, merely set
# `._remote_error` and maybe cancel any task currently
# entered in `Portal.open_context()` presuming the
# error is "cancel causing" (i.e. `ContextCancelled`
# or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# XXX NEVER do this XXX..!! # XXX NEVER do this XXX..!!
@ -1218,26 +1293,44 @@ class Context:
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Capturing overrun-msg from {from_uid} to caller {cid}' f'Queueing OVERRUN msg on caller task:\n'
f'{msg}' f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> cid: {cid}\n'
f' |_{self._task}\n\n'
f'{pformat(msg)}\n'
) )
self._overflow_q.append(msg) self._overflow_q.append(msg)
return False return False
try: try:
log.runtime( log.runtime(
f'Delivering IPC `Context` msg:\n' f'Delivering msg from IPC ctx:\n'
f'<= {from_uid}\n' f'<= {from_uid}\n'
f'=> caller: {cid}\n' f' |_ {nsf}()\n\n'
f'{msg}'
f'=> {self._task}\n'
f' |_cid={self.cid}\n\n'
f'{pformat(msg)}\n'
) )
# from .devx._debug import pause # from .devx._debug import pause
# await pause() # await pause()
# NOTE: if an error is deteced we should always still
# send it through the feeder-mem-chan and expect
# it to be raised by any context (stream) consumer
# task via the consumer APIs on both the `Context` and
# `MsgStream`!
#
# XXX the reason is that this method is always called
# by the IPC msg handling runtime task and that is not
# normally the task that should get cancelled/error
# from some remote fault!
send_chan.send_nowait(msg) send_chan.send_nowait(msg)
return True return True
# if an error is deteced we should always
# expect it to be raised by any context (stream)
# consumer task
except trio.BrokenResourceError: except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the # TODO: what is the right way to handle the case where the
@ -1248,7 +1341,13 @@ class Context:
# XXX: local consumer has closed their side # XXX: local consumer has closed their side
# so cancel the far end streaming task # so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed") log.warning(
'Rx chan for `Context` alfready closed?\n'
f'cid: {self.cid}\n'
'Failed to deliver msg:\n'
f'send_chan: {send_chan}\n\n'
f'{pformat(msg)}\n'
)
return False return False
# NOTE XXX: by default we do **not** maintain context-stream # NOTE XXX: by default we do **not** maintain context-stream
@ -1257,44 +1356,54 @@ class Context:
# msg handling loop which calls into this method! # msg handling loop which calls into this method!
except trio.WouldBlock: except trio.WouldBlock:
# XXX: always push an error even if the local # XXX: always push an error even if the local receiver
# receiver is in overrun state. # is in overrun state - i.e. if an 'error' msg is
# self._maybe_cancel_and_set_remote_error(msg) # delivered then
# `._maybe_cancel_and_set_remote_error(msg)` should
# have already been called above!
#
# XXX QUESTION XXX: if we rx an error while in an
# overrun state and that msg isn't stuck in an
# overflow queue what happens?!?
local_uid = current_actor().uid local_uid = current_actor().uid
lines = [ txt: str = (
f'OVERRUN on actor-task context {cid}@{local_uid}!\n' 'on IPC context:\n'
# TODO: put remote task name here if possible?
f'sender: {from_uid}',
f'msg: {msg}',
# TODO: put task func name here and maybe an arrow
# from sender to overrunner?
# f'local task {self.func_name}'
]
if not self._stream_opened:
lines.insert(
1,
f'\n*** No stream open on `{local_uid[0]}` side! ***\n'
)
text = '\n'.join(lines) f'<= sender: {from_uid}\n'
f' |_ {self._nsf}()\n\n'
f'=> overrun: {local_uid}\n'
f' |_cid: {cid}\n'
f' |_task: {self._task}\n'
)
if not self._stream_opened:
txt += (
f'\n*** No stream open on `{local_uid[0]}` side! ***\n\n'
f'{msg}\n'
)
# XXX: lul, this really can't be backpressure since any # XXX: lul, this really can't be backpressure since any
# blocking here will block the entire msg loop rpc sched for # blocking here will block the entire msg loop rpc sched for
# a whole channel.. maybe we should rename it? # a whole channel.. maybe we should rename it?
if self._allow_overruns: if self._allow_overruns:
text += f'\nStarting overflow queuing task on msg: {msg}' txt += (
log.warning(text) '\n*** Starting overflow queuing task on msg ***\n\n'
f'{msg}\n'
)
log.warning(txt)
if ( if (
not self._in_overrun not self._in_overrun
): ):
self._overflow_q.append(msg) self._overflow_q.append(msg)
n = self._scope_nursery tn: trio.Nursery = self._scope_nursery
assert not n.child_tasks assert not tn.child_tasks
try: try:
n.start_soon( tn.start_soon(
self._drain_overflows, self._drain_overflows,
) )
return True
except RuntimeError: except RuntimeError:
# if the nursery is already cancelled due to # if the nursery is already cancelled due to
# this context exiting or in error, we ignore # this context exiting or in error, we ignore
@ -1302,11 +1411,12 @@ class Context:
# anything different. # anything different.
return False return False
else: else:
txt += f'\n{msg}\n'
# raise local overrun and immediately pack as IPC # raise local overrun and immediately pack as IPC
# msg for far end. # msg for far end.
try: try:
raise StreamOverrun( raise StreamOverrun(
text, txt,
sender=from_uid, sender=from_uid,
) )
except StreamOverrun as err: except StreamOverrun as err:
@ -1314,20 +1424,28 @@ class Context:
err, err,
cid=cid, cid=cid,
) )
# err_msg['cid']: str = cid
try: try:
# relay condition to sender side remote task
await chan.send(err_msg) await chan.send(err_msg)
return True
except trio.BrokenResourceError: except trio.BrokenResourceError:
# XXX: local consumer has closed their side # XXX: local consumer has closed their side
# so cancel the far end streaming task # so cancel the far end streaming task
log.warning(f"{chan} is already closed") log.warning(
'Channel for ctx is already closed?\n'
f'|_{chan}\n'
)
# ow, indicate unable to deliver by default
return False return False
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,
nsf: NamespacePath,
msg_buffer_size: int = 2**6, msg_buffer_size: int = 2**6,
**kwargs, **kwargs,
@ -1345,10 +1463,12 @@ def mk_context(
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
ctx = Context( ctx = Context(
chan, chan=chan,
cid, cid=cid,
_send_chan=send_chan, _send_chan=send_chan,
_recv_chan=recv_chan, _recv_chan=recv_chan,
_nsf=nsf,
_task=trio.lowlevel.current_task(),
**kwargs, **kwargs,
) )
ctx._result: int | Any = id(ctx) ctx._result: int | Any = id(ctx)

View File

@ -69,18 +69,35 @@ from ._streaming import (
log = get_logger(__name__) log = get_logger(__name__)
# TODO: rename to `unwrap_result()` and use
# `._raise_from_no_key_in_msg()` (after tweak to
# accept a `chan: Channel` arg) in key block!
def _unwrap_msg( def _unwrap_msg(
msg: dict[str, Any], msg: dict[str, Any],
channel: Channel channel: Channel,
hide_tb: bool = True,
) -> Any: ) -> Any:
__tracebackhide__ = True '''
Unwrap a final result from a `{return: <Any>}` IPC msg.
'''
__tracebackhide__: bool = hide_tb
try: try:
return msg['return'] return msg['return']
except KeyError as ke: except KeyError as ke:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), "Received internal error at portal?" assert msg.get('cid'), (
raise unpack_error(msg, channel) from ke "Received internal error at portal?"
)
raise unpack_error(
msg,
channel
) from ke
class Portal: class Portal:
@ -107,7 +124,7 @@ class Portal:
cancel_timeout: float = 0.5 cancel_timeout: float = 0.5
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.chan = channel
# during the portal's lifetime # during the portal's lifetime
self._result_msg: Optional[dict] = None self._result_msg: Optional[dict] = None
@ -118,6 +135,18 @@ class Portal:
self._streams: set[MsgStream] = set() self._streams: set[MsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
@property
def channel(self) -> Channel:
'''
Proxy to legacy attr name..
Consider the shorter `Portal.chan` instead of `.channel` ;)
'''
log.debug(
'Consider the shorter `Portal.chan` instead of `.channel` ;)'
)
return self.chan
async def _submit_for_result( async def _submit_for_result(
self, self,
ns: str, ns: str,
@ -125,14 +154,14 @@ class Portal:
**kwargs **kwargs
) -> None: ) -> None:
assert self._expect_result is None, \ assert self._expect_result is None, (
"A pending main result has already been submitted" "A pending main result has already been submitted"
)
self._expect_result = await self.actor.start_remote_task( self._expect_result = await self.actor.start_remote_task(
self.channel, self.channel,
ns, nsf=NamespacePath(f'{ns}:{func}'),
func, kwargs=kwargs
kwargs
) )
async def _return_once( async def _return_once(
@ -173,7 +202,10 @@ class Portal:
self._expect_result self._expect_result
) )
return _unwrap_msg(self._result_msg, self.channel) return _unwrap_msg(
self._result_msg,
self.channel,
)
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -215,26 +247,33 @@ class Portal:
purpose. purpose.
''' '''
if not self.channel.connected(): chan: Channel = self.channel
log.cancel("This channel is already closed can't cancel") if not chan.connected():
log.runtime(
'This channel is already closed, skipping cancel request..'
)
return False return False
reminfo: str = (
f'uid: {self.channel.uid}\n'
f' |_{chan}\n'
)
log.cancel( log.cancel(
f"Sending actor cancel request to {self.channel.uid} on " f'Sending actor cancel request to peer\n'
f"{self.channel}") f'{reminfo}'
)
self.channel._cancel_called = True
self.channel._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with
# a proper shield # a proper shield
with trio.move_on_after( with trio.move_on_after(
timeout timeout
or self.cancel_timeout or
self.cancel_timeout
) as cs: ) as cs:
cs.shield = True cs.shield: bool = True
await self.run_from_ns( await self.run_from_ns(
'self', 'self',
'cancel', 'cancel',
@ -242,7 +281,10 @@ class Portal:
return True return True
if cs.cancelled_caught: if cs.cancelled_caught:
log.cancel(f"May have failed to cancel {self.channel.uid}") log.cancel(
'May have failed to cancel peer?\n'
f'{reminfo}'
)
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
@ -274,25 +316,31 @@ class Portal:
A special namespace `self` can be used to invoke `Actor` A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this instance methods in the remote runtime. Currently this
should only be used solely for ``tractor`` runtime should only ever be used for `Actor` (method) runtime
internals. internals!
''' '''
nsf = NamespacePath(
f'{namespace_path}:{function_name}'
)
ctx = await self.actor.start_remote_task( ctx = await self.actor.start_remote_task(
self.channel, chan=self.channel,
namespace_path, nsf=nsf,
function_name, kwargs=kwargs,
kwargs,
) )
ctx._portal = self ctx._portal = self
msg = await self._return_once(ctx) msg = await self._return_once(ctx)
return _unwrap_msg(msg, self.channel) return _unwrap_msg(
msg,
self.channel,
)
async def run( async def run(
self, self,
func: str, func: str,
fn_name: Optional[str] = None, fn_name: str|None = None,
**kwargs **kwargs
) -> Any: ) -> Any:
''' '''
Submit a remote function to be scheduled and run by actor, in Submit a remote function to be scheduled and run by actor, in
@ -311,8 +359,9 @@ class Portal:
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
fn_mod_path = func fn_mod_path: str = func
assert isinstance(fn_name, str) assert isinstance(fn_name, str)
nsf = NamespacePath(f'{fn_mod_path}:{fn_name}')
else: # function reference was passed directly else: # function reference was passed directly
if ( if (
@ -325,13 +374,12 @@ class Portal:
raise TypeError( raise TypeError(
f'{func} must be a non-streaming async function!') f'{func} must be a non-streaming async function!')
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple() nsf = NamespacePath.from_ref(func)
ctx = await self.actor.start_remote_task( ctx = await self.actor.start_remote_task(
self.channel, self.channel,
fn_mod_path, nsf=nsf,
fn_name, kwargs=kwargs,
kwargs,
) )
ctx._portal = self ctx._portal = self
return _unwrap_msg( return _unwrap_msg(
@ -355,15 +403,10 @@ class Portal:
raise TypeError( raise TypeError(
f'{async_gen_func} must be an async generator function!') f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = NamespacePath.from_ref( ctx: Context = await self.actor.start_remote_task(
async_gen_func
).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel, self.channel,
fn_mod_path, nsf=NamespacePath.from_ref(async_gen_func),
fn_name, kwargs=kwargs,
kwargs
) )
ctx._portal = self ctx._portal = self
@ -405,7 +448,10 @@ class Portal:
self, self,
func: Callable, func: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
# proxied to RPC
**kwargs, **kwargs,
) -> AsyncGenerator[tuple[Context, Any], None]: ) -> AsyncGenerator[tuple[Context, Any], None]:
@ -448,13 +494,12 @@ class Portal:
# TODO: i think from here onward should probably # TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new # just be factored into an `@acm` inside a new
# a new `_context.py` mod. # a new `_context.py` mod.
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple() nsf = NamespacePath.from_ref(func)
ctx = await self.actor.start_remote_task( ctx: Context = await self.actor.start_remote_task(
self.channel, self.channel,
fn_mod_path, nsf=nsf,
fn_name, kwargs=kwargs,
kwargs,
# NOTE: it's imporant to expose this since you might # NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does # get the case where the parent who opened the context does
@ -721,10 +766,10 @@ class Portal:
# assert maybe_ctxc # assert maybe_ctxc
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.runtime(
'Waiting on final context-task result for\n' 'Waiting on final context result for\n'
f'task: {cid}\n' f'peer: {uid}\n'
f'actor: {uid}' f'|_{ctx._task}\n'
) )
# XXX NOTE XXX: the below call to # XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise # `Context.result()` will ALWAYS raise
@ -771,13 +816,19 @@ class Portal:
RemoteActorError(), RemoteActorError(),
): ):
log.exception( log.exception(
f'Context `{fn_name}` remotely errored:\n' 'Context remotely errored!\n'
f'`{tbstr}`' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
f'{tbstr}'
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
f'Context {fn_name} returned value from callee:\n' 'Context returned final result from callee task:\n'
f'`{result_or_err}`' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
f'`{result_or_err}`\n'
) )
finally: finally:
@ -855,26 +906,31 @@ class Portal:
# CASE 2 # CASE 2
if ctx._cancel_called: if ctx._cancel_called:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by caller with\n' 'Context cancelled by caller task\n'
f'|_{ctx._task}\n\n'
f'{etype}' f'{etype}'
) )
# CASE 1 # CASE 1
else: else:
log.cancel( log.cancel(
f'Context cancelled by callee with {etype}\n' f'Context cancelled by remote callee task\n'
f'target: `{fn_name}`\n' f'peer: {uid}\n'
f'task:{cid}\n' f'|_ {nsf}()\n\n'
f'actor:{uid}'
f'{etype}\n'
) )
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
f'Exiting context opened with {ctx.chan.uid}' 'Removing IPC ctx opened with peer\n'
f'{uid}\n'
f'|_{ctx}\n'
) )
self.actor._contexts.pop( self.actor._contexts.pop(
(self.channel.uid, ctx.cid), (uid, cid),
None, None,
) )

View File

@ -95,9 +95,6 @@ class MsgStream(trio.abc.Channel):
try: try:
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# if 'return' in msg:
# return msg
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
@ -128,13 +125,9 @@ class MsgStream(trio.abc.Channel):
# introducing this # introducing this
if self._eoc: if self._eoc:
raise self._eoc raise self._eoc
# raise trio.EndOfChannel
if self._closed: if self._closed:
raise self._closed raise self._closed
# raise trio.ClosedResourceError(
# 'This stream was already closed'
# )
src_err: Exception|None = None src_err: Exception|None = None
try: try:
@ -143,6 +136,7 @@ class MsgStream(trio.abc.Channel):
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# log.exception('GOT KEYERROR')
src_err = kerr src_err = kerr
# NOTE: may raise any of the below error types # NOTE: may raise any of the below error types
@ -161,9 +155,9 @@ class MsgStream(trio.abc.Channel):
# trio.ClosedResourceError, # by self._rx_chan # trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
) as eoc: ) as eoc:
# log.exception('GOT EOC')
src_err = eoc src_err = eoc
self._eoc = eoc self._eoc = eoc
# await trio.sleep(1)
# a ``ClosedResourceError`` indicates that the internal # a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the # feeder memory receive channel was closed likely by the
@ -201,6 +195,7 @@ class MsgStream(trio.abc.Channel):
# raise eoc # raise eoc
except trio.ClosedResourceError as cre: # by self._rx_chan except trio.ClosedResourceError as cre: # by self._rx_chan
# log.exception('GOT CRE')
src_err = cre src_err = cre
log.warning( log.warning(
'`Context._rx_chan` was already closed?' '`Context._rx_chan` was already closed?'
@ -211,6 +206,8 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose() drained: list[Exception|dict] = await self.aclose()
if drained: if drained:
# from .devx import pause
# await pause()
log.warning( log.warning(
'Drained context msgs during closure:\n' 'Drained context msgs during closure:\n'
f'{drained}' f'{drained}'
@ -237,31 +234,32 @@ class MsgStream(trio.abc.Channel):
Cancel associated remote actor task and local memory channel on Cancel associated remote actor task and local memory channel on
close. close.
Notes:
- REMEMBER that this is also called by `.__aexit__()` so
careful consideration must be made to handle whatever
internal stsate is mutated, particuarly in terms of
draining IPC msgs!
- more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
''' '''
# XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
if ( # rx_chan = self._rx_chan
rx_chan._closed
or
self._closed
):
log.cancel(
f'`MsgStream` is already closed\n'
f'.cid: {self._ctx.cid}\n'
f'._rx_chan`: {rx_chan}\n'
f'._eoc: {self._eoc}\n'
f'._closed: {self._eoc}\n'
)
# XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on
# the `._rx_chan` since we call this method on
# `.__aexit__()` as well!!!
# => SO ENSURE WE CATCH ALL TERMINATION STATES in this
# block including the EoC..
if self.closed:
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics. # per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return [] return []
ctx: Context = self._ctx ctx: Context = self._ctx
# caught_eoc: bool = False
drained: list[Exception|dict] = [] drained: list[Exception|dict] = []
while not drained: while not drained:
try: try:
@ -274,17 +272,26 @@ class MsgStream(trio.abc.Channel):
# TODO: inject into parent `Context` buf? # TODO: inject into parent `Context` buf?
drained.append(maybe_final_msg) drained.append(maybe_final_msg)
# NOTE: we only need these handlers due to the
# `.receive_nowait()` call above which may re-raise
# one of these errors on a msg key error!
except trio.WouldBlock as be: except trio.WouldBlock as be:
drained.append(be) drained.append(be)
break break
except trio.EndOfChannel as eoc: except trio.EndOfChannel as eoc:
self._eoc: Exception = eoc
drained.append(eoc) drained.append(eoc)
# caught_eoc = True break
self._eoc: bool = eoc
except trio.ClosedResourceError as cre:
self._closed = cre
drained.append(cre)
break break
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
# log.exception('GOT CTXC')
log.cancel( log.cancel(
'Context was cancelled during stream closure:\n' 'Context was cancelled during stream closure:\n'
f'canceller: {ctxc.canceller}\n' f'canceller: {ctxc.canceller}\n'
@ -339,8 +346,11 @@ class MsgStream(trio.abc.Channel):
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# await rx_chan.aclose() # await rx_chan.aclose()
# self._eoc: bool = caught_eoc if not self._eoc:
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by {self._ctx.side}\n'
f'|_{self}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same # if we're a bi-dir ``MsgStream`` BECAUSE this same
@ -379,6 +389,26 @@ class MsgStream(trio.abc.Channel):
# self._closed = True # self._closed = True
return drained return drained
@property
def closed(self) -> bool:
if (
(rxc := self._rx_chan._closed)
or
(_closed := self._closed)
or
(_eoc := self._eoc)
):
log.runtime(
f'`MsgStream` is already closed\n'
f'{self}\n'
f' |_cid: {self._ctx.cid}\n'
f' |_rx_chan._closed: {type(rxc)} = {rxc}\n'
f' |_closed: {type(_closed)} = {_closed}\n'
f' |_eoc: {type(_eoc)} = {_eoc}'
)
return True
return False
@acm @acm
async def subscribe( async def subscribe(
self, self,