From c2a1612bf58b2ec7b566ec75b6fe12f3a11c7255 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 26 Apr 2021 16:14:45 -0400 Subject: [PATCH] Drop sync function support You can always wrap a sync function in an async one and there seems to be no good reason to support invoking them directly especially since cancellation won't work without some thread hackery. If it's requested we'll point users to `trio-parallel`. Resolves #77 --- tractor/_actor.py | 115 +++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 57 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 32ca960..174457b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -20,7 +20,7 @@ from trio_typing import TaskStatus from async_generator import aclosing from ._ipc import Channel -from ._streaming import Context, _context +from ._streaming import Context from .log import get_logger from ._exceptions import ( pack_error, @@ -58,11 +58,12 @@ async def _invoke( cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) - _context.set(ctx) + if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx treat_as_gen = True + try: is_async_partial = False is_async_gen_partial = False @@ -70,65 +71,55 @@ async def _invoke( is_async_partial = inspect.iscoroutinefunction(func.func) is_async_gen_partial = inspect.isasyncgenfunction(func.func) - if ( - not inspect.iscoroutinefunction(func) and - not inspect.isasyncgenfunction(func) and - not is_async_partial and - not is_async_gen_partial - ): - await chan.send({'functype': 'function', 'cid': cid}) + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! with cancel_scope as cs: task_status.started(cs) - await chan.send({'return': func(**kwargs), 'cid': cid}) - else: - coro = func(**kwargs) + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) - if inspect.isasyncgen(coro): + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': True, 'cid': cid}) + else: + if treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above with cancel_scope as cs: task_status.started(cs) - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': True, 'cid': cid}) + await coro + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) else: - if treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - task_status.started(cs) - await coro - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with cancel_scope as cs: - task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + # regular async function + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: @@ -151,9 +142,11 @@ async def _invoke( err_msg['cid'] = cid try: await chan.send(err_msg) + except trio.ClosedResourceError: log.warning( f"Failed to ship error to caller @ {chan.uid}") + if cs is None: # error is from above code not from rpc invocation task_status.started(err) @@ -400,22 +393,26 @@ class Actor: async def _push_result( self, chan: Channel, + cid: str, msg: Dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" - cid = msg['cid'] send_chan, recv_chan = self._cids2qs[(actorid, cid)] assert send_chan.cid == cid # type: ignore + if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") + # indicate to consumer that far end has stopped return await send_chan.aclose() + try: log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await send_chan.send(msg) + except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task @@ -491,9 +488,11 @@ class Actor: log.trace( # type: ignore f"Received msg {msg} from {chan.uid}") - if msg.get('cid'): + + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter - await self._push_result(chan, msg) + await self._push_result(chan, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -945,11 +944,13 @@ class Actor: return scope.cancel() + # wait for _invoke to mark the task complete log.debug( f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") await is_complete.wait() + log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n")