forked from goodboy/tractor
Merge pull request #395 from goodboy/to_asyncio_eoc_signal
`to_asyncio` eoc signal: use `trio.EndOfChannel` to indicate (maybe non-graceful) `asyncio.Task` termination
commit
a6f599901c
|
@ -571,6 +571,8 @@ def test_basic_interloop_channel_stream(
|
||||||
fan_out: bool,
|
fan_out: bool,
|
||||||
):
|
):
|
||||||
async def main():
|
async def main():
|
||||||
|
# TODO, figure out min timeout here!
|
||||||
|
with trio.fail_after(6):
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await an.run_in_actor(
|
portal = await an.run_in_actor(
|
||||||
stream_from_aio,
|
stream_from_aio,
|
||||||
|
@ -1086,6 +1088,108 @@ def test_sigint_closes_lifetime_stack(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# ?TODO asyncio.Task fn-deco?
|
||||||
|
# -[ ] do sig checkingat import time like @context?
|
||||||
|
# -[ ] maybe name it @aio_task ??
|
||||||
|
# -[ ] chan: to_asyncio.InterloopChannel ??
|
||||||
|
async def raise_before_started(
|
||||||
|
# from_trio: asyncio.Queue,
|
||||||
|
# to_trio: trio.abc.SendChannel,
|
||||||
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
`asyncio.Task` entry point which RTEs before calling
|
||||||
|
`to_trio.send_nowait()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
|
||||||
|
|
||||||
|
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
|
||||||
|
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
|
||||||
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def caching_ep(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
):
|
||||||
|
|
||||||
|
log = tractor.log.get_logger('caching_ep')
|
||||||
|
log.info('syncing via `ctx.started()`')
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
# XXX, allocate the `open_channel_from()` inside
|
||||||
|
# a `.trionics.maybe_open_context()`.
|
||||||
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
|
async with (
|
||||||
|
tractor.trionics.maybe_open_context(
|
||||||
|
acm_func=tractor.to_asyncio.open_channel_from,
|
||||||
|
kwargs={
|
||||||
|
'target': raise_before_started,
|
||||||
|
# ^XXX, kwarg to `open_channel_from()`
|
||||||
|
},
|
||||||
|
|
||||||
|
# lock around current actor task access
|
||||||
|
key=tractor.current_actor().uid,
|
||||||
|
|
||||||
|
) as (cache_hit, (clients, chan)),
|
||||||
|
):
|
||||||
|
if cache_hit:
|
||||||
|
log.error(
|
||||||
|
'Re-using cached `.open_from_channel()` call!\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.info(
|
||||||
|
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
def test_aio_side_raises_before_started(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Simulates connection-err from `piker.brokers.ib.api`..
|
||||||
|
|
||||||
|
Ensure any error raised by child-`asyncio.Task` BEFORE
|
||||||
|
`chan.started()`
|
||||||
|
|
||||||
|
'''
|
||||||
|
# delay = 999 if debug_mode else 1
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(3):
|
||||||
|
an: tractor.ActorNursery
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
) as an:
|
||||||
|
p: tractor.Portal = await an.start_actor(
|
||||||
|
'lchan_cacher_that_raises_fast',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
infect_asyncio=True,
|
||||||
|
)
|
||||||
|
async with p.open_context(
|
||||||
|
caching_ep,
|
||||||
|
) as (ctx, first):
|
||||||
|
assert not first
|
||||||
|
|
||||||
|
with pytest.raises(
|
||||||
|
expected_exception=(RemoteActorError),
|
||||||
|
) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
# ensure `asyncio.Task` exception is bubbled
|
||||||
|
# allll the way erp!!
|
||||||
|
rae = excinfo.value
|
||||||
|
assert rae.boxed_type is RuntimeError
|
||||||
|
|
||||||
# TODO: debug_mode tests once we get support for `asyncio`!
|
# TODO: debug_mode tests once we get support for `asyncio`!
|
||||||
#
|
#
|
||||||
# -[ ] need tests to wrap both scripts:
|
# -[ ] need tests to wrap both scripts:
|
||||||
|
|
|
@ -130,6 +130,7 @@ class LinkedTaskChannel(
|
||||||
_trio_task: trio.Task
|
_trio_task: trio.Task
|
||||||
_aio_task_complete: trio.Event
|
_aio_task_complete: trio.Event
|
||||||
|
|
||||||
|
_closed_by_aio_task: bool = False
|
||||||
_suppress_graceful_exits: bool = True
|
_suppress_graceful_exits: bool = True
|
||||||
|
|
||||||
_trio_err: BaseException|None = None
|
_trio_err: BaseException|None = None
|
||||||
|
@ -208,10 +209,15 @@ class LinkedTaskChannel(
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
await self._from_aio.aclose()
|
await self._from_aio.aclose()
|
||||||
|
|
||||||
def started(
|
# ?TODO? async version of this?
|
||||||
|
def started_nowait(
|
||||||
self,
|
self,
|
||||||
val: Any = None,
|
val: Any = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Synchronize aio-side with its trio-parent.
|
||||||
|
|
||||||
|
'''
|
||||||
self._aio_started_val = val
|
self._aio_started_val = val
|
||||||
return self._to_trio.send_nowait(val)
|
return self._to_trio.send_nowait(val)
|
||||||
|
|
||||||
|
@ -242,6 +248,7 @@ class LinkedTaskChannel(
|
||||||
# cycle on the trio side?
|
# cycle on the trio side?
|
||||||
# await trio.lowlevel.checkpoint()
|
# await trio.lowlevel.checkpoint()
|
||||||
return await self._from_aio.receive()
|
return await self._from_aio.receive()
|
||||||
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
async with translate_aio_errors(
|
async with translate_aio_errors(
|
||||||
chan=self,
|
chan=self,
|
||||||
|
@ -319,7 +326,7 @@ def _run_asyncio_task(
|
||||||
qsize: int = 1,
|
qsize: int = 1,
|
||||||
provide_channels: bool = False,
|
provide_channels: bool = False,
|
||||||
suppress_graceful_exits: bool = True,
|
suppress_graceful_exits: bool = True,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> LinkedTaskChannel:
|
) -> LinkedTaskChannel:
|
||||||
|
@ -347,18 +354,6 @@ def _run_asyncio_task(
|
||||||
# value otherwise it would just return ;P
|
# value otherwise it would just return ;P
|
||||||
assert qsize > 1
|
assert qsize > 1
|
||||||
|
|
||||||
if provide_channels:
|
|
||||||
assert 'to_trio' in args
|
|
||||||
|
|
||||||
# allow target func to accept/stream results manually by name
|
|
||||||
if 'to_trio' in args:
|
|
||||||
kwargs['to_trio'] = to_trio
|
|
||||||
|
|
||||||
if 'from_trio' in args:
|
|
||||||
kwargs['from_trio'] = from_trio
|
|
||||||
|
|
||||||
coro = func(**kwargs)
|
|
||||||
|
|
||||||
trio_task: trio.Task = trio.lowlevel.current_task()
|
trio_task: trio.Task = trio.lowlevel.current_task()
|
||||||
trio_cs = trio.CancelScope()
|
trio_cs = trio.CancelScope()
|
||||||
aio_task_complete = trio.Event()
|
aio_task_complete = trio.Event()
|
||||||
|
@ -373,6 +368,25 @@ def _run_asyncio_task(
|
||||||
_suppress_graceful_exits=suppress_graceful_exits,
|
_suppress_graceful_exits=suppress_graceful_exits,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# allow target func to accept/stream results manually by name
|
||||||
|
if 'to_trio' in args:
|
||||||
|
kwargs['to_trio'] = to_trio
|
||||||
|
|
||||||
|
if 'from_trio' in args:
|
||||||
|
kwargs['from_trio'] = from_trio
|
||||||
|
|
||||||
|
if 'chan' in args:
|
||||||
|
kwargs['chan'] = chan
|
||||||
|
|
||||||
|
if provide_channels:
|
||||||
|
assert (
|
||||||
|
'to_trio' in args
|
||||||
|
or
|
||||||
|
'chan' in args
|
||||||
|
)
|
||||||
|
|
||||||
|
coro = func(**kwargs)
|
||||||
|
|
||||||
async def wait_on_coro_final_result(
|
async def wait_on_coro_final_result(
|
||||||
to_trio: trio.MemorySendChannel,
|
to_trio: trio.MemorySendChannel,
|
||||||
coro: Awaitable,
|
coro: Awaitable,
|
||||||
|
@ -445,9 +459,23 @@ def _run_asyncio_task(
|
||||||
f'Task exited with final result: {result!r}\n'
|
f'Task exited with final result: {result!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# only close the sender side which will relay
|
# XXX ALWAYS close the child-`asyncio`-task-side's
|
||||||
# a `trio.EndOfChannel` to the trio (consumer) side.
|
# `to_trio` handle which will in turn relay
|
||||||
|
# a `trio.EndOfChannel` to the `trio`-parent.
|
||||||
|
# Consequently the parent `trio` task MUST ALWAYS
|
||||||
|
# check for any `chan._aio_err` to be raised when it
|
||||||
|
# receives an EoC.
|
||||||
|
#
|
||||||
|
# NOTE, there are 2 EoC cases,
|
||||||
|
# - normal/graceful EoC due to the aio-side actually
|
||||||
|
# terminating its "streaming", but the task did not
|
||||||
|
# error and is not yet complete.
|
||||||
|
#
|
||||||
|
# - the aio-task terminated and we specially mark the
|
||||||
|
# closure as due to the `asyncio.Task`'s exit.
|
||||||
|
#
|
||||||
to_trio.close()
|
to_trio.close()
|
||||||
|
chan._closed_by_aio_task = True
|
||||||
|
|
||||||
aio_task_complete.set()
|
aio_task_complete.set()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -645,8 +673,9 @@ def _run_asyncio_task(
|
||||||
not trio_cs.cancel_called
|
not trio_cs.cancel_called
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Cancelling `trio` side due to aio-side src exc\n'
|
f'Cancelling trio-side due to aio-side src exc\n'
|
||||||
f'{curr_aio_err}\n'
|
f'\n'
|
||||||
|
f'{curr_aio_err!r}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'(c>\n'
|
f'(c>\n'
|
||||||
f' |_{trio_task}\n'
|
f' |_{trio_task}\n'
|
||||||
|
@ -758,6 +787,7 @@ async def translate_aio_errors(
|
||||||
aio_done_before_trio: bool = aio_task.done()
|
aio_done_before_trio: bool = aio_task.done()
|
||||||
assert aio_task
|
assert aio_task
|
||||||
trio_err: BaseException|None = None
|
trio_err: BaseException|None = None
|
||||||
|
eoc: trio.EndOfChannel|None = None
|
||||||
try:
|
try:
|
||||||
yield # back to one of the cross-loop apis
|
yield # back to one of the cross-loop apis
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
|
@ -789,12 +819,48 @@ async def translate_aio_errors(
|
||||||
# )
|
# )
|
||||||
# raise
|
# raise
|
||||||
|
|
||||||
# XXX always passthrough EoC since this translator is often
|
# XXX EoC is a special SIGNAL from the aio-side here!
|
||||||
# called from `LinkedTaskChannel.receive()` which we want
|
# There are 2 cases to handle:
|
||||||
# passthrough and further we have no special meaning for it in
|
# 1. the "EoC passthrough" case.
|
||||||
# terms of relaying errors or signals from the aio side!
|
# - the aio-task actually closed the channel "gracefully" and
|
||||||
except trio.EndOfChannel as eoc:
|
# the trio-task should unwind any ongoing channel
|
||||||
|
# iteration/receiving,
|
||||||
|
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
|
||||||
|
# in which case we want to relay the actual "end-of-chan" for
|
||||||
|
# iteration purposes.
|
||||||
|
#
|
||||||
|
# 2. relaying the "asyncio.Task termination" case.
|
||||||
|
# - if the aio-task terminates, maybe with an error, AND the
|
||||||
|
# `open_channel_from()` API was used, it will always signal
|
||||||
|
# that termination.
|
||||||
|
# |_`wait_on_coro_final_result()` always calls
|
||||||
|
# `to_trio.close()` when `provide_channels=True` so we need to
|
||||||
|
# always check if there is an aio-side exc which needs to be
|
||||||
|
# relayed to the parent trio side!
|
||||||
|
# |_in this case the special `chan._closed_by_aio_task` is
|
||||||
|
# ALWAYS set.
|
||||||
|
#
|
||||||
|
except trio.EndOfChannel as _eoc:
|
||||||
|
eoc = _eoc
|
||||||
|
if (
|
||||||
|
chan._closed_by_aio_task
|
||||||
|
and
|
||||||
|
aio_err
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
f'The asyncio-child task terminated due to error\n'
|
||||||
|
f'{aio_err!r}\n'
|
||||||
|
)
|
||||||
|
chan._trio_to_raise = aio_err
|
||||||
trio_err = chan._trio_err = eoc
|
trio_err = chan._trio_err = eoc
|
||||||
|
#
|
||||||
|
# ?TODO?, raise something like a,
|
||||||
|
# chan._trio_to_raise = AsyncioErrored()
|
||||||
|
# BUT, with the tb rewritten to reflect the underlying
|
||||||
|
# call stack?
|
||||||
|
else:
|
||||||
|
trio_err = chan._trio_err = eoc
|
||||||
|
|
||||||
raise eoc
|
raise eoc
|
||||||
|
|
||||||
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
|
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
|
||||||
|
@ -1047,7 +1113,7 @@ async def translate_aio_errors(
|
||||||
#
|
#
|
||||||
if wait_on_aio_task:
|
if wait_on_aio_task:
|
||||||
await chan._aio_task_complete.wait()
|
await chan._aio_task_complete.wait()
|
||||||
log.info(
|
log.debug(
|
||||||
'asyncio-task is done and unblocked trio-side!\n'
|
'asyncio-task is done and unblocked trio-side!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1064,11 +1130,17 @@ async def translate_aio_errors(
|
||||||
trio_to_raise: (
|
trio_to_raise: (
|
||||||
AsyncioCancelled|
|
AsyncioCancelled|
|
||||||
AsyncioTaskExited|
|
AsyncioTaskExited|
|
||||||
|
Exception| # relayed from aio-task
|
||||||
None
|
None
|
||||||
) = chan._trio_to_raise
|
) = chan._trio_to_raise
|
||||||
|
|
||||||
|
raise_from: Exception = (
|
||||||
|
trio_err if (aio_err is trio_to_raise)
|
||||||
|
else aio_err
|
||||||
|
)
|
||||||
|
|
||||||
if not suppress_graceful_exits:
|
if not suppress_graceful_exits:
|
||||||
raise trio_to_raise from (aio_err or trio_err)
|
raise trio_to_raise from raise_from
|
||||||
|
|
||||||
if trio_to_raise:
|
if trio_to_raise:
|
||||||
match (
|
match (
|
||||||
|
@ -1101,7 +1173,7 @@ async def translate_aio_errors(
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
case _:
|
case _:
|
||||||
raise trio_to_raise from (aio_err or trio_err)
|
raise trio_to_raise from raise_from
|
||||||
|
|
||||||
# Check if the asyncio-side is the cause of the trio-side
|
# Check if the asyncio-side is the cause of the trio-side
|
||||||
# error.
|
# error.
|
||||||
|
@ -1167,7 +1239,6 @@ async def run_task(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_channel_from(
|
async def open_channel_from(
|
||||||
|
|
||||||
target: Callable[..., Any],
|
target: Callable[..., Any],
|
||||||
suppress_graceful_exits: bool = True,
|
suppress_graceful_exits: bool = True,
|
||||||
**target_kwargs,
|
**target_kwargs,
|
||||||
|
@ -1201,7 +1272,6 @@ async def open_channel_from(
|
||||||
# deliver stream handle upward
|
# deliver stream handle upward
|
||||||
yield first, chan
|
yield first, chan
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
# await tractor.pause(shield=True) # ya it worx ;)
|
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
|
Loading…
Reference in New Issue