diff --git a/tractor/_actor.py b/tractor/_actor.py index 32ca960..174457b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -20,7 +20,7 @@ from trio_typing import TaskStatus from async_generator import aclosing from ._ipc import Channel -from ._streaming import Context, _context +from ._streaming import Context from .log import get_logger from ._exceptions import ( pack_error, @@ -58,11 +58,12 @@ async def _invoke( cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) - _context.set(ctx) + if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx treat_as_gen = True + try: is_async_partial = False is_async_gen_partial = False @@ -70,65 +71,55 @@ async def _invoke( is_async_partial = inspect.iscoroutinefunction(func.func) is_async_gen_partial = inspect.isasyncgenfunction(func.func) - if ( - not inspect.iscoroutinefunction(func) and - not inspect.isasyncgenfunction(func) and - not is_async_partial and - not is_async_gen_partial - ): - await chan.send({'functype': 'function', 'cid': cid}) + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! with cancel_scope as cs: task_status.started(cs) - await chan.send({'return': func(**kwargs), 'cid': cid}) - else: - coro = func(**kwargs) + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) - if inspect.isasyncgen(coro): + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': True, 'cid': cid}) + else: + if treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above with cancel_scope as cs: task_status.started(cs) - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': True, 'cid': cid}) + await coro + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) else: - if treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - task_status.started(cs) - await coro - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with cancel_scope as cs: - task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + # regular async function + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: @@ -151,9 +142,11 @@ async def _invoke( err_msg['cid'] = cid try: await chan.send(err_msg) + except trio.ClosedResourceError: log.warning( f"Failed to ship error to caller @ {chan.uid}") + if cs is None: # error is from above code not from rpc invocation task_status.started(err) @@ -400,22 +393,26 @@ class Actor: async def _push_result( self, chan: Channel, + cid: str, msg: Dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" - cid = msg['cid'] send_chan, recv_chan = self._cids2qs[(actorid, cid)] assert send_chan.cid == cid # type: ignore + if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") + # indicate to consumer that far end has stopped return await send_chan.aclose() + try: log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await send_chan.send(msg) + except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task @@ -491,9 +488,11 @@ class Actor: log.trace( # type: ignore f"Received msg {msg} from {chan.uid}") - if msg.get('cid'): + + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter - await self._push_result(chan, msg) + await self._push_result(chan, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -945,11 +944,13 @@ class Actor: return scope.cancel() + # wait for _invoke to mark the task complete log.debug( f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") await is_complete.wait() + log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n")