forked from goodboy/tractor
1
0
Fork 0

Merge pull request from goodboy/drop_sync_funcs

Drop sync func invocation support.
mp_teardown_hardening
goodboy 2021-04-28 12:14:41 -04:00 committed by GitHub
commit a5a88e2f64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 77 deletions

View File

@ -1,7 +1,7 @@
import tractor import tractor
def cellar_door(): async def cellar_door():
assert not tractor.is_root_process() assert not tractor.is_root_process()
return "Dang that's beautiful" return "Dang that's beautiful"

View File

@ -1,7 +1,7 @@
import tractor import tractor
def movie_theatre_question(): async def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent """A question asked in a dark theatre, in a tangent
(errr, I mean different) process. (errr, I mean different) process.
""" """

View File

@ -29,7 +29,7 @@ PRIMES = [
] ]
def is_prime(n): async def is_prime(n):
if n < 2: if n < 2:
return False return False
if n == 2: if n == 2:

View File

@ -24,7 +24,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup( setup(
name="tractor", name="tractor",
version='0.1.0a0', # first ever alpha version='0.1.0a1', # first ever alpha
description='structured concurrrent "actors"', description='structured concurrrent "actors"',
long_description=readme, long_description=readme,
license='GPLv3', license='GPLv3',

View File

@ -120,7 +120,7 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
assert exc.type == AssertionError assert exc.type == AssertionError
def do_nothing(): async def do_nothing():
pass pass

View File

@ -51,7 +51,7 @@ def test_local_arbiter_subactor_global_state(arb_addr):
assert result == 10 assert result == 10
def movie_theatre_question(): async def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent """A question asked in a dark theatre, in a tangent
(errr, I mean different) process. (errr, I mean different) process.
""" """
@ -80,7 +80,7 @@ async def test_movie_theatre_convo(start_method):
await portal.cancel_actor() await portal.cancel_actor()
def cellar_door(): async def cellar_door():
return "Dang that's beautiful" return "Dang that's beautiful"

View File

@ -20,7 +20,7 @@ from trio_typing import TaskStatus
from async_generator import aclosing from async_generator import aclosing
from ._ipc import Channel from ._ipc import Channel
from ._streaming import Context, _context from ._streaming import Context
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
pack_error, pack_error,
@ -58,77 +58,69 @@ async def _invoke(
cs = None cs = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, cancel_scope) ctx = Context(chan, cid, cancel_scope)
_context.set(ctx)
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
treat_as_gen = True treat_as_gen = True
try:
is_async_partial = False
is_async_gen_partial = False
if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func)
is_async_gen_partial = inspect.isasyncgenfunction(func.func)
if ( # errors raised inside this block are propgated back to caller
not inspect.iscoroutinefunction(func) and try:
not inspect.isasyncgenfunction(func) and if not (
not is_async_partial and inspect.isasyncgenfunction(func) or
not is_async_gen_partial inspect.iscoroutinefunction(func)
): ):
await chan.send({'functype': 'function', 'cid': cid}) raise TypeError(f'{func} must be an async function!')
coro = func(**kwargs)
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': func(**kwargs), 'cid': cid}) async with aclosing(coro) as agen:
else: async for item in agen:
coro = func(**kwargs) # TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
if inspect.isasyncgen(coro): log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
else:
if treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope # XXX: the async-func may spawn further tasks which push
# is cancelled and we execute the below line, # back values like an async-generator would but must
# any ``ActorNursery.__aexit__()`` WON'T be # manualy construct the response dict-packet-responses as
# triggered in the underlying async gen! So we # above
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
async with aclosing(coro) as agen: await coro
async for item in agen: if not cs.cancelled_caught:
# TODO: can we send values back in here? # task was not cancelled so we can instruct the
# it's gonna require a `while True:` and # far end async gen to tear down
# some non-blocking way to retrieve new `asend()` await chan.send({'stop': True, 'cid': cid})
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
else: else:
if treat_as_gen: # regular async function
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({'functype': 'asyncfunc', 'cid': cid})
# XXX: the async-func may spawn further tasks which push with cancel_scope as cs:
# back values like an async-generator would but must task_status.started(cs)
# manualy construct the response dict-packet-responses as await chan.send({'return': await coro, 'cid': cid})
# above
with cancel_scope as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
else:
await chan.send({'functype': 'asyncfunction', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
@ -151,9 +143,11 @@ async def _invoke(
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( log.warning(
f"Failed to ship error to caller @ {chan.uid}") f"Failed to ship error to caller @ {chan.uid}")
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
task_status.started(err) task_status.started(err)
@ -178,6 +172,10 @@ def _get_mod_abspath(module):
return os.path.abspath(module.__file__) return os.path.abspath(module.__file__)
# process-global stack closed at end on actor runtime teardown
_lifetime_stack: ExitStack = ExitStack()
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
@ -192,7 +190,6 @@ class Actor:
_root_n: Optional[trio.Nursery] = None _root_n: Optional[trio.Nursery] = None
_service_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None
_lifetime_stack: ExitStack = ExitStack()
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
@ -397,22 +394,26 @@ class Actor:
async def _push_result( async def _push_result(
self, self,
chan: Channel, chan: Channel,
cid: str,
msg: Dict[str, Any], msg: Dict[str, Any],
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
actorid = chan.uid actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}" assert actorid, f"`actorid` can't be {actorid}"
cid = msg['cid']
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
if 'stop' in msg: if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end") log.debug(f"{send_chan} was terminated at remote end")
# indicate to consumer that far end has stopped
return await send_chan.aclose() return await send_chan.aclose()
try: try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure # maintain backpressure
await send_chan.send(msg) await send_chan.send(msg)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# XXX: local consumer has closed their side # XXX: local consumer has closed their side
# so cancel the far end streaming task # so cancel the far end streaming task
@ -488,9 +489,11 @@ class Actor:
log.trace( # type: ignore log.trace( # type: ignore
f"Received msg {msg} from {chan.uid}") f"Received msg {msg} from {chan.uid}")
if msg.get('cid'):
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result(chan, msg) await self._push_result(chan, cid, msg)
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
continue continue
@ -545,8 +548,9 @@ class Actor:
# deadlock and other weird behaviour) # deadlock and other weird behaviour)
if func != self.cancel: if func != self.cancel:
if isinstance(cs, Exception): if isinstance(cs, Exception):
log.warning(f"Task for RPC func {func} failed with" log.warning(
f"{cs}") f"Task for RPC func {func} failed with"
f"{cs}")
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
@ -784,7 +788,7 @@ class Actor:
# tear down all lifetime contexts if not in guest mode # tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint? # XXX: should this just be in the entrypoint?
log.warning("Closing all actor lifetime contexts") log.warning("Closing all actor lifetime contexts")
self._lifetime_stack.close() _lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
if registered_with_arbiter and ( if registered_with_arbiter and (
@ -858,6 +862,15 @@ class Actor:
# signal the server is down since nursery above terminated # signal the server is down since nursery above terminated
self._server_down.set() self._server_down.set()
def cancel_soon(self) -> None:
"""Cancel this actor asap; can be called from a sync context.
Schedules `.cancel()` to be run immediately just like when
cancelled by the parent.
"""
assert self._service_n
self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool: async def cancel(self) -> bool:
"""Cancel this actor. """Cancel this actor.
@ -933,11 +946,13 @@ class Actor:
return return
scope.cancel() scope.cancel()
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
log.debug( log.debug(
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n") f"peer: {chan.uid}\n")
await is_complete.wait() await is_complete.wait()
log.debug( log.debug(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n") f"peer: {chan.uid}\n")
@ -1023,7 +1038,7 @@ class Arbiter(Actor):
self._waiters = {} self._waiters = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def find_actor(self, name: str) -> Optional[Tuple[str, int]]: async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:
return sockaddr return sockaddr
@ -1064,7 +1079,7 @@ class Arbiter(Actor):
return sockaddrs return sockaddrs
def register_actor( async def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int] self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
) -> None: ) -> None:
name, uuid = uid name, uuid = uid
@ -1077,5 +1092,5 @@ class Arbiter(Actor):
if isinstance(event, trio.Event): if isinstance(event, trio.Event):
event.set() event.set()
def unregister_actor(self, uid: Tuple[str, str]) -> None: async def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid, None) self._registry.pop(uid, None)

View File

@ -182,7 +182,7 @@ class Portal:
first_msg = await recv_chan.receive() first_msg = await recv_chan.receive()
functype = first_msg.get('functype') functype = first_msg.get('functype')
if functype == 'function' or functype == 'asyncfunction': if functype == 'asyncfunc':
resp_type = 'return' resp_type = 'return'
elif functype == 'asyncgen': elif functype == 'asyncgen':
resp_type = 'yield' resp_type = 'yield'