Make `process_messages()` a mod func

we_bein_all_matchy
Tyler Goodlet 2022-08-03 15:14:36 -04:00
parent d4084b2032
commit a3a5bc267e
3 changed files with 229 additions and 225 deletions

View File

@ -611,9 +611,11 @@ async def open_portal(
msg_loop_cs: Optional[trio.CancelScope] = None
if start_msg_loop:
from ._runtime import process_messages
msg_loop_cs = await nursery.start(
partial(
actor._process_messages,
process_messages,
actor,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends

View File

@ -614,7 +614,7 @@ class Actor:
# Begin channel management - respond to remote requests and
# process received reponses.
try:
disconnected = await self._process_messages(chan)
disconnected = await process_messages(self, chan)
except (
trio.Cancelled,
@ -884,227 +884,6 @@ class Actor:
ctx._remote_func_type = functype
return ctx
async def _process_messages(
self,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
Process messages for the channel async-RPC style.
Receive multiplexed RPC requests and deliver responses over ``chan``.
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg = None
nursery_cancelled_before_task: bool = False
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()``
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
log.cancel(
f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy():
if channel is chan:
await self._cancel_task(cid, channel)
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
# process command request
try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
# (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
assert chan.uid
exc = unpack_error(msg, chan=chan)
chan._exc = exc
raise exc
log.runtime(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
if funcname == 'cancel':
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled "
f"by {chan.uid}"
)
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
)
loop_cs.cancel()
break
if funcname == '_cancel_task':
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
kwargs['chan'] = chan
log.cancel(
f'Remote request to cancel task\n'
f'remote actor: {chan.uid}\n'
f'task: {cid}'
)
try:
await _invoke(
self,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception("failed to cancel task?")
continue
else:
# complain to client about restricted modules
try:
func = self._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
await chan.send(err_msg)
continue
# spin up a task for the requested function
log.runtime(f"Spawning task for {func}")
assert self._service_n
try:
cs = await self._service_n.start(
partial(_invoke, self, cid, chan, func, kwargs),
name=funcname,
)
except (RuntimeError, trio.MultiError):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
break
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
# if func != self.cancel:
if isinstance(cs, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await self.cancel_rpc_tasks(chan)
except (
TransportClosed,
):
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task:
sn = self._service_n
assert sn and sn.cancel_scope.cancel_called
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await try_ship_error_to_parent(self._parent_chan, err)
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise
finally:
# msg debugging for when he machinery is brokey
log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent(
self,
parent_addr: Optional[tuple[str, int]],
@ -1264,7 +1043,8 @@ class Actor:
if self._parent_chan:
await root_nursery.start(
partial(
self._process_messages,
process_messages,
self,
self._parent_chan,
shield=True,
)
@ -1566,6 +1346,228 @@ class Actor:
return self._infected_aio
async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
Process messages for the channel async-RPC style.
Receive multiplexed RPC requests and deliver responses over ``chan``.
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg = None
nursery_cancelled_before_task: bool = False
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()``
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
log.cancel(
f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..")
for (channel, cid) in actor._rpc_tasks.copy():
if channel is chan:
await actor._cancel_task(cid, channel)
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')
if cid:
# deliver response to local caller/waiter
await actor._push_result(chan, cid, msg)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
# process command request
try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
# (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
assert chan.uid
exc = unpack_error(msg, chan=chan)
chan._exc = exc
raise exc
log.runtime(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(actor, funcname)
if funcname == 'cancel':
# don't start entire actor runtime
# cancellation if this actor is in debug
# mode
pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
log.cancel(
f"Actor {actor.uid} was remotely cancelled "
f"by {chan.uid}"
)
await _invoke(
actor, cid, chan, func, kwargs, is_rpc=False
)
loop_cs.cancel()
break
if funcname == '_cancel_task':
# we immediately start the runtime machinery
# shutdown
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``_async_main()``
kwargs['chan'] = chan
log.cancel(
f'Remote request to cancel task\n'
f'remote actor: {chan.uid}\n'
f'task: {cid}'
)
try:
await _invoke(
actor,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception("failed to cancel task?")
continue
else:
# complain to client about restricted modules
try:
func = actor._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
await chan.send(err_msg)
continue
# spin up a task for the requested function
log.runtime(f"Spawning task for {func}")
assert actor._service_n
try:
cs = await actor._service_n.start(
partial(_invoke, actor, cid, chan, func, kwargs),
name=funcname,
)
except (RuntimeError, trio.MultiError):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
break
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
# if func != actor.cancel:
if isinstance(cs, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
actor._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
actor._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
# end of async for, channel disconnect vis
# ``trio.EndOfChannel``
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await actor.cancel_rpc_tasks(chan)
except (
TransportClosed,
):
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task:
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)
else:
# ship any "internal" exception (i.e. one from internal
# machinery not from an rpc task) to parent
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise
finally:
# msg debugging for when he machinery is brokey
log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
class Arbiter(Actor):
'''
A special actor who knows all the other actors and always has

View File

@ -24,7 +24,7 @@ Built-in messaging patterns, types, APIs and helpers.
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# entering the ``_runtime.process_messages()`` loop).
# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902