forked from goodboy/tractor
1
0
Fork 0

Drop sync function support

You can always wrap a sync function in an async one and there seems to
be no good reason to support invoking them directly especially since
cancellation won't work without some thread hackery. If it's requested
we'll point users to `trio-parallel`.

Resolves #77
drop_sync_funcs
Tyler Goodlet 2021-04-26 16:14:45 -04:00
parent be22a2526a
commit c2a1612bf5
1 changed files with 58 additions and 57 deletions

View File

@ -20,7 +20,7 @@ from trio_typing import TaskStatus
from async_generator import aclosing from async_generator import aclosing
from ._ipc import Channel from ._ipc import Channel
from ._streaming import Context, _context from ._streaming import Context
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
pack_error, pack_error,
@ -58,11 +58,12 @@ async def _invoke(
cs = None cs = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, cancel_scope) ctx = Context(chan, cid, cancel_scope)
_context.set(ctx)
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
treat_as_gen = True treat_as_gen = True
try: try:
is_async_partial = False is_async_partial = False
is_async_gen_partial = False is_async_gen_partial = False
@ -70,65 +71,55 @@ async def _invoke(
is_async_partial = inspect.iscoroutinefunction(func.func) is_async_partial = inspect.iscoroutinefunction(func.func)
is_async_gen_partial = inspect.isasyncgenfunction(func.func) is_async_gen_partial = inspect.isasyncgenfunction(func.func)
if ( coro = func(**kwargs)
not inspect.iscoroutinefunction(func) and
not inspect.isasyncgenfunction(func) and if inspect.isasyncgen(coro):
not is_async_partial and await chan.send({'functype': 'asyncgen', 'cid': cid})
not is_async_gen_partial # XXX: massive gotcha! If the containing scope
): # is cancelled and we execute the below line,
await chan.send({'functype': 'function', 'cid': cid}) # any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': func(**kwargs), 'cid': cid}) async with aclosing(coro) as agen:
else: async for item in agen:
coro = func(**kwargs) # TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
if inspect.isasyncgen(coro): log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
else:
if treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope # XXX: the async-func may spawn further tasks which push
# is cancelled and we execute the below line, # back values like an async-generator would but must
# any ``ActorNursery.__aexit__()`` WON'T be # manualy construct the response dict-packet-responses as
# triggered in the underlying async gen! So we # above
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
async with aclosing(coro) as agen: await coro
async for item in agen: if not cs.cancelled_caught:
# TODO: can we send values back in here? # task was not cancelled so we can instruct the
# it's gonna require a `while True:` and # far end async gen to tear down
# some non-blocking way to retrieve new `asend()` await chan.send({'stop': True, 'cid': cid})
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
else: else:
if treat_as_gen: # regular async function
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({'functype': 'asyncfunc', 'cid': cid})
# XXX: the async-func may spawn further tasks which push with cancel_scope as cs:
# back values like an async-generator would but must task_status.started(cs)
# manualy construct the response dict-packet-responses as await chan.send({'return': await coro, 'cid': cid})
# above
with cancel_scope as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
else:
await chan.send({'functype': 'asyncfunction', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
@ -151,9 +142,11 @@ async def _invoke(
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( log.warning(
f"Failed to ship error to caller @ {chan.uid}") f"Failed to ship error to caller @ {chan.uid}")
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
task_status.started(err) task_status.started(err)
@ -400,22 +393,26 @@ class Actor:
async def _push_result( async def _push_result(
self, self,
chan: Channel, chan: Channel,
cid: str,
msg: Dict[str, Any], msg: Dict[str, Any],
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
actorid = chan.uid actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}" assert actorid, f"`actorid` can't be {actorid}"
cid = msg['cid']
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
if 'stop' in msg: if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end") log.debug(f"{send_chan} was terminated at remote end")
# indicate to consumer that far end has stopped
return await send_chan.aclose() return await send_chan.aclose()
try: try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure # maintain backpressure
await send_chan.send(msg) await send_chan.send(msg)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# XXX: local consumer has closed their side # XXX: local consumer has closed their side
# so cancel the far end streaming task # so cancel the far end streaming task
@ -491,9 +488,11 @@ class Actor:
log.trace( # type: ignore log.trace( # type: ignore
f"Received msg {msg} from {chan.uid}") f"Received msg {msg} from {chan.uid}")
if msg.get('cid'):
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan, msg) await self._push_result(chan, cid, msg)
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
continue continue
@ -945,11 +944,13 @@ class Actor:
return return
scope.cancel() scope.cancel()
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
log.debug( log.debug(
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n") f"peer: {chan.uid}\n")
await is_complete.wait() await is_complete.wait()
log.debug( log.debug(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n") f"peer: {chan.uid}\n")