From a3a5bc267e7e0cedefc4ad20e215f4a74b93c441 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 3 Aug 2022 15:14:36 -0400 Subject: [PATCH] Make `process_messages()` a mod func --- tractor/_portal.py | 4 +- tractor/_runtime.py | 448 ++++++++++++++++++++++---------------------- tractor/msg.py | 2 +- 3 files changed, 229 insertions(+), 225 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index c7c8700..94a285b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -611,9 +611,11 @@ async def open_portal( msg_loop_cs: Optional[trio.CancelScope] = None if start_msg_loop: + from ._runtime import process_messages msg_loop_cs = await nursery.start( partial( - actor._process_messages, + process_messages, + actor, channel, # if the local task is cancelled we want to keep # the msg loop running until our block ends diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f3fb0d9..815020c 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -614,7 +614,7 @@ class Actor: # Begin channel management - respond to remote requests and # process received reponses. try: - disconnected = await self._process_messages(chan) + disconnected = await process_messages(self, chan) except ( trio.Cancelled, @@ -884,227 +884,6 @@ class Actor: ctx._remote_func_type = functype return ctx - async def _process_messages( - self, - chan: Channel, - shield: bool = False, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - - ) -> bool: - ''' - Process messages for the channel async-RPC style. - - Receive multiplexed RPC requests and deliver responses over ``chan``. - - ''' - # TODO: once https://github.com/python-trio/trio/issues/467 gets - # worked out we'll likely want to use that! - msg = None - nursery_cancelled_before_task: bool = False - - log.runtime(f"Entering msg loop for {chan} from {chan.uid}") - try: - with trio.CancelScope(shield=shield) as loop_cs: - # this internal scope allows for keeping this message - # loop running despite the current task having been - # cancelled (eg. `open_portal()` may call this method from - # a locally spawned task) and recieve this scope using - # ``scope = Nursery.start()`` - task_status.started(loop_cs) - async for msg in chan: - - if msg is None: # loop terminate sentinel - - log.cancel( - f"Channel to {chan.uid} terminated?\n" - "Cancelling all associated tasks..") - - for (channel, cid) in self._rpc_tasks.copy(): - if channel is chan: - await self._cancel_task(cid, channel) - - log.runtime( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - - break - - log.transport( # type: ignore - f"Received msg {msg} from {chan.uid}") - - cid = msg.get('cid') - if cid: - # deliver response to local caller/waiter - await self._push_result(chan, cid, msg) - - log.runtime( - f"Waiting on next msg for {chan} from {chan.uid}") - continue - - # process command request - try: - ns, funcname, kwargs, actorid, cid = msg['cmd'] - except KeyError: - # This is the non-rpc error case, that is, an - # error **not** raised inside a call to ``_invoke()`` - # (i.e. no cid was provided in the msg - see above). - # Push this error to all local channel consumers - # (normally portals) by marking the channel as errored - assert chan.uid - exc = unpack_error(msg, chan=chan) - chan._exc = exc - raise exc - - log.runtime( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - - if ns == 'self': - func = getattr(self, funcname) - - if funcname == 'cancel': - - # don't start entire actor runtime - # cancellation if this actor is in debug - # mode - pdb_complete = _debug.Lock.local_pdb_complete - if pdb_complete: - await pdb_complete.wait() - - # we immediately start the runtime machinery - # shutdown - with trio.CancelScope(shield=True): - # self.cancel() was called so kill this - # msg loop and break out into - # ``_async_main()`` - log.cancel( - f"Actor {self.uid} was remotely cancelled " - f"by {chan.uid}" - ) - await _invoke( - self, cid, chan, func, kwargs, is_rpc=False - ) - - loop_cs.cancel() - break - - if funcname == '_cancel_task': - - # we immediately start the runtime machinery - # shutdown - with trio.CancelScope(shield=True): - # self.cancel() was called so kill this - # msg loop and break out into - # ``_async_main()`` - kwargs['chan'] = chan - log.cancel( - f'Remote request to cancel task\n' - f'remote actor: {chan.uid}\n' - f'task: {cid}' - ) - try: - await _invoke( - self, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - except BaseException: - log.exception("failed to cancel task?") - - continue - else: - # complain to client about restricted modules - try: - func = self._get_rpc_func(ns, funcname) - except (ModuleNotExposed, AttributeError) as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - await chan.send(err_msg) - continue - - # spin up a task for the requested function - log.runtime(f"Spawning task for {func}") - assert self._service_n - try: - cs = await self._service_n.start( - partial(_invoke, self, cid, chan, func, kwargs), - name=funcname, - ) - except (RuntimeError, trio.MultiError): - # avoid reporting a benign race condition - # during actor runtime teardown. - nursery_cancelled_before_task = True - break - - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - # if func != self.cancel: - if isinstance(cs, Exception): - log.warning( - f"Task for RPC func {func} failed with" - f"{cs}") - else: - # mark that we have ongoing rpc tasks - self._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) - - log.runtime( - f"Waiting on next msg for {chan} from {chan.uid}") - - # end of async for, channel disconnect vis - # ``trio.EndOfChannel`` - log.runtime( - f"{chan} for {chan.uid} disconnected, cancelling tasks" - ) - await self.cancel_rpc_tasks(chan) - - except ( - TransportClosed, - ): - # channels "breaking" (for TCP streams by EOF or 104 - # connection-reset) is ok since we don't have a teardown - # handshake for them (yet) and instead we simply bail out of - # the message loop and expect the teardown sequence to clean - # up. - log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') - - # transport **was** disconnected - return True - - except (Exception, trio.MultiError) as err: - if nursery_cancelled_before_task: - sn = self._service_n - assert sn and sn.cancel_scope.cancel_called - log.cancel( - f'Service nursery cancelled before it handled {funcname}' - ) - else: - # ship any "internal" exception (i.e. one from internal - # machinery not from an rpc task) to parent - log.exception("Actor errored:") - if self._parent_chan: - await try_ship_error_to_parent(self._parent_chan, err) - - # if this is the `MainProcess` we expect the error broadcasting - # above to trigger an error at consuming portal "checkpoints" - raise - - finally: - # msg debugging for when he machinery is brokey - log.runtime( - f"Exiting msg loop for {chan} from {chan.uid} " - f"with last msg:\n{msg}") - - # transport **was not** disconnected - return False - async def _from_parent( self, parent_addr: Optional[tuple[str, int]], @@ -1264,7 +1043,8 @@ class Actor: if self._parent_chan: await root_nursery.start( partial( - self._process_messages, + process_messages, + self, self._parent_chan, shield=True, ) @@ -1566,6 +1346,228 @@ class Actor: return self._infected_aio +async def process_messages( + actor: Actor, + chan: Channel, + shield: bool = False, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> bool: + ''' + Process messages for the channel async-RPC style. + + Receive multiplexed RPC requests and deliver responses over ``chan``. + + ''' + # TODO: once https://github.com/python-trio/trio/issues/467 gets + # worked out we'll likely want to use that! + msg = None + nursery_cancelled_before_task: bool = False + + log.runtime(f"Entering msg loop for {chan} from {chan.uid}") + try: + with trio.CancelScope(shield=shield) as loop_cs: + # this internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) and recieve this scope using + # ``scope = Nursery.start()`` + task_status.started(loop_cs) + async for msg in chan: + + if msg is None: # loop terminate sentinel + + log.cancel( + f"Channel to {chan.uid} terminated?\n" + "Cancelling all associated tasks..") + + for (channel, cid) in actor._rpc_tasks.copy(): + if channel is chan: + await actor._cancel_task(cid, channel) + + log.runtime( + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") + + break + + log.transport( # type: ignore + f"Received msg {msg} from {chan.uid}") + + cid = msg.get('cid') + if cid: + # deliver response to local caller/waiter + await actor._push_result(chan, cid, msg) + + log.runtime( + f"Waiting on next msg for {chan} from {chan.uid}") + continue + + # process command request + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + assert chan.uid + exc = unpack_error(msg, chan=chan) + chan._exc = exc + raise exc + + log.runtime( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + + if ns == 'self': + func = getattr(actor, funcname) + + if funcname == 'cancel': + + # don't start entire actor runtime + # cancellation if this actor is in debug + # mode + pdb_complete = _debug.Lock.local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # we immediately start the runtime machinery + # shutdown + with trio.CancelScope(shield=True): + # actor.cancel() was called so kill this + # msg loop and break out into + # ``_async_main()`` + log.cancel( + f"Actor {actor.uid} was remotely cancelled " + f"by {chan.uid}" + ) + await _invoke( + actor, cid, chan, func, kwargs, is_rpc=False + ) + + loop_cs.cancel() + break + + if funcname == '_cancel_task': + + # we immediately start the runtime machinery + # shutdown + with trio.CancelScope(shield=True): + # actor.cancel() was called so kill this + # msg loop and break out into + # ``_async_main()`` + kwargs['chan'] = chan + log.cancel( + f'Remote request to cancel task\n' + f'remote actor: {chan.uid}\n' + f'task: {cid}' + ) + try: + await _invoke( + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, + ) + except BaseException: + log.exception("failed to cancel task?") + + continue + else: + # complain to client about restricted modules + try: + func = actor._get_rpc_func(ns, funcname) + except (ModuleNotExposed, AttributeError) as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) + continue + + # spin up a task for the requested function + log.runtime(f"Spawning task for {func}") + assert actor._service_n + try: + cs = await actor._service_n.start( + partial(_invoke, actor, cid, chan, func, kwargs), + name=funcname, + ) + except (RuntimeError, trio.MultiError): + # avoid reporting a benign race condition + # during actor runtime teardown. + nursery_cancelled_before_task = True + break + + # never allow cancelling cancel requests (results in + # deadlock and other weird behaviour) + # if func != actor.cancel: + if isinstance(cs, Exception): + log.warning( + f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + actor._ongoing_rpc_tasks = trio.Event() + log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + actor._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) + + log.runtime( + f"Waiting on next msg for {chan} from {chan.uid}") + + # end of async for, channel disconnect vis + # ``trio.EndOfChannel`` + log.runtime( + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + await actor.cancel_rpc_tasks(chan) + + except ( + TransportClosed, + ): + # channels "breaking" (for TCP streams by EOF or 104 + # connection-reset) is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out of + # the message loop and expect the teardown sequence to clean + # up. + log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') + + # transport **was** disconnected + return True + + except (Exception, trio.MultiError) as err: + if nursery_cancelled_before_task: + sn = actor._service_n + assert sn and sn.cancel_scope.cancel_called + log.cancel( + f'Service nursery cancelled before it handled {funcname}' + ) + else: + # ship any "internal" exception (i.e. one from internal + # machinery not from an rpc task) to parent + log.exception("Actor errored:") + if actor._parent_chan: + await try_ship_error_to_parent(actor._parent_chan, err) + + # if this is the `MainProcess` we expect the error broadcasting + # above to trigger an error at consuming portal "checkpoints" + raise + + finally: + # msg debugging for when he machinery is brokey + log.runtime( + f"Exiting msg loop for {chan} from {chan.uid} " + f"with last msg:\n{msg}") + + # transport **was not** disconnected + return False + + class Arbiter(Actor): ''' A special actor who knows all the other actors and always has diff --git a/tractor/msg.py b/tractor/msg.py index 138718b..9af3ccd 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -24,7 +24,7 @@ Built-in messaging patterns, types, APIs and helpers. # ``pkgutil.resolve_name()`` internally uses # ``importlib.import_module()`` which can be filtered by inserting # a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before -# entering the ``Actor._process_messages()`` loop). +# entering the ``_runtime.process_messages()`` loop). # - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645 # - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules # - https://stackoverflow.com/a/63320902