forked from goodboy/tractor
Move core actor runtime logging to, well, "runtime"
parent
d2f0843041
commit
4d5a5c147a
|
@ -1,5 +1,6 @@
|
|||
"""
|
||||
Actor primitives and helpers
|
||||
|
||||
"""
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
|
@ -52,7 +53,8 @@ async def _invoke(
|
|||
Union[trio.CancelScope, BaseException]
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
'''Invoke local func and deliver result(s) over provided channel.
|
||||
'''
|
||||
Invoke local func and deliver result(s) over provided channel.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
|
@ -127,7 +129,7 @@ async def _invoke(
|
|||
# to_yield = await coro.asend(to_send)
|
||||
await chan.send({'yield': item, 'cid': cid})
|
||||
|
||||
log.debug(f"Finished iterating {coro}")
|
||||
log.runtime(f"Finished iterating {coro}")
|
||||
# TODO: we should really support a proper
|
||||
# `StopAsyncIteration` system here for returning a final
|
||||
# value if desired
|
||||
|
@ -354,10 +356,10 @@ class Actor:
|
|||
"""Wait for a connection back from a spawned actor with a given
|
||||
``uid``.
|
||||
"""
|
||||
log.debug(f"Waiting for peer {uid} to connect")
|
||||
log.runtime(f"Waiting for peer {uid} to connect")
|
||||
event = self._peer_connected.setdefault(uid, trio.Event())
|
||||
await event.wait()
|
||||
log.debug(f"{uid} successfully connected back to us")
|
||||
log.runtime(f"{uid} successfully connected back to us")
|
||||
return event, self._peers[uid][-1]
|
||||
|
||||
def load_modules(self) -> None:
|
||||
|
@ -381,7 +383,7 @@ class Actor:
|
|||
# XXX append the allowed module to the python path which
|
||||
# should allow for relative (at least downward) imports.
|
||||
sys.path.append(os.path.dirname(filepath))
|
||||
log.debug(f"Attempting to import {modpath}@{filepath}")
|
||||
log.runtime(f"Attempting to import {modpath}@{filepath}")
|
||||
mod = importlib.import_module(modpath)
|
||||
self._mods[modpath] = mod
|
||||
if modpath == '__main__':
|
||||
|
@ -450,7 +452,7 @@ class Actor:
|
|||
# Instructing connection: this is likely a new channel to
|
||||
# a recently spawned actor which we'd like to control via
|
||||
# async-rpc calls.
|
||||
log.debug(f"Waking channel waiters {event.statistics()}")
|
||||
log.runtime(f"Waking channel waiters {event.statistics()}")
|
||||
# Alert any task waiting on this connection to come up
|
||||
event.set()
|
||||
|
||||
|
@ -487,19 +489,19 @@ class Actor:
|
|||
# await send_chan.aclose()
|
||||
|
||||
# Drop ref to channel so it can be gc-ed and disconnected
|
||||
log.debug(f"Releasing channel {chan} from {chan.uid}")
|
||||
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
||||
chans = self._peers.get(chan.uid)
|
||||
chans.remove(chan)
|
||||
|
||||
if not chans:
|
||||
log.debug(f"No more channels for {chan.uid}")
|
||||
log.runtime(f"No more channels for {chan.uid}")
|
||||
self._peers.pop(chan.uid, None)
|
||||
|
||||
log.debug(f"Peers is {self._peers}")
|
||||
log.runtime(f"Peers is {self._peers}")
|
||||
|
||||
if not self._peers: # no more channels connected
|
||||
self._no_more_peers.set()
|
||||
log.debug("Signalling no more peer channels")
|
||||
log.runtime("Signalling no more peer channels")
|
||||
|
||||
# # XXX: is this necessary (GC should do it?)
|
||||
if chan.connected():
|
||||
|
@ -508,7 +510,7 @@ class Actor:
|
|||
# an error and so we should at least try to terminate
|
||||
# the channel from this end gracefully.
|
||||
|
||||
log.debug(f"Disconnecting channel {chan}")
|
||||
log.runtime(f"Disconnecting channel {chan}")
|
||||
try:
|
||||
# send a msg loop terminate sentinel
|
||||
await chan.send(None)
|
||||
|
@ -538,12 +540,12 @@ class Actor:
|
|||
# if ctx:
|
||||
# ctx._error_from_remote_msg(msg)
|
||||
|
||||
# log.debug(f"{send_chan} was terminated at remote end")
|
||||
# log.runtime(f"{send_chan} was terminated at remote end")
|
||||
# # indicate to consumer that far end has stopped
|
||||
# return await send_chan.aclose()
|
||||
|
||||
try:
|
||||
log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
||||
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
||||
# maintain backpressure
|
||||
await send_chan.send(msg)
|
||||
|
||||
|
@ -565,7 +567,7 @@ class Actor:
|
|||
|
||||
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
|
||||
|
||||
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
||||
log.runtime(f"Getting result queue for {actorid} cid {cid}")
|
||||
try:
|
||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||
except KeyError:
|
||||
|
@ -590,7 +592,7 @@ class Actor:
|
|||
cid = str(uuid.uuid4())
|
||||
assert chan.uid
|
||||
send_chan, recv_chan = self.get_memchans(chan.uid, cid)
|
||||
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||
return cid, recv_chan
|
||||
|
||||
|
@ -607,7 +609,7 @@ class Actor:
|
|||
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
||||
# worked out we'll likely want to use that!
|
||||
msg = None
|
||||
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
try:
|
||||
with trio.CancelScope(shield=shield) as loop_cs:
|
||||
# this internal scope allows for keeping this message
|
||||
|
@ -620,14 +622,14 @@ class Actor:
|
|||
|
||||
if msg is None: # loop terminate sentinel
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
||||
|
||||
for (channel, cid) in self._rpc_tasks.copy():
|
||||
if channel is chan:
|
||||
await self._cancel_task(cid, channel)
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Msg loop signalled to terminate for"
|
||||
f" {chan} from {chan.uid}")
|
||||
|
||||
|
@ -641,7 +643,7 @@ class Actor:
|
|||
# deliver response to local caller/waiter
|
||||
await self._push_result(chan, cid, msg)
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
continue
|
||||
|
||||
|
@ -659,7 +661,7 @@ class Actor:
|
|||
chan._exc = exc
|
||||
raise exc
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Processing request from {actorid}\n"
|
||||
f"{ns}.{funcname}({kwargs})")
|
||||
if ns == 'self':
|
||||
|
@ -685,7 +687,7 @@ class Actor:
|
|||
continue
|
||||
|
||||
# spin up a task for the requested function
|
||||
log.debug(f"Spawning task for {func}")
|
||||
log.runtime(f"Spawning task for {func}")
|
||||
assert self._service_n
|
||||
cs = await self._service_n.start(
|
||||
partial(_invoke, self, cid, chan, func, kwargs),
|
||||
|
@ -716,11 +718,11 @@ class Actor:
|
|||
loop_cs.cancel()
|
||||
break
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
else:
|
||||
# channel disconnect
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"{chan} for {chan.uid} disconnected, cancelling tasks"
|
||||
)
|
||||
await self.cancel_rpc_tasks(chan)
|
||||
|
@ -733,7 +735,7 @@ class Actor:
|
|||
# handshake for them (yet) and instead we simply bail out of
|
||||
# the message loop and expect the teardown sequence to clean
|
||||
# up.
|
||||
log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
||||
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
|
||||
|
@ -749,12 +751,12 @@ class Actor:
|
|||
|
||||
except trio.Cancelled:
|
||||
# debugging only
|
||||
log.debug(f"Msg loop was cancelled for {chan}")
|
||||
log.runtime(f"Msg loop was cancelled for {chan}")
|
||||
raise
|
||||
|
||||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||
f"with last msg:\n{msg}")
|
||||
|
||||
|
@ -780,7 +782,7 @@ class Actor:
|
|||
# Receive runtime state from our parent
|
||||
parent_data: dict[str, Any]
|
||||
parent_data = await chan.recv()
|
||||
log.debug(
|
||||
log.runtime(
|
||||
"Received state from parent:\n"
|
||||
f"{parent_data}"
|
||||
)
|
||||
|
@ -789,7 +791,7 @@ class Actor:
|
|||
parent_data.pop('bind_port'),
|
||||
)
|
||||
rvs = parent_data.pop('_runtime_vars')
|
||||
log.debug(f"Runtime vars are: {rvs}")
|
||||
log.runtime(f"Runtime vars are: {rvs}")
|
||||
rvs['_is_root'] = False
|
||||
_state._runtime_vars.update(rvs)
|
||||
|
||||
|
@ -893,7 +895,7 @@ class Actor:
|
|||
_state._runtime_vars['_root_mailbox'] = accept_addr
|
||||
|
||||
# Register with the arbiter if we're told its addr
|
||||
log.debug(f"Registering {self} for role `{self.name}`")
|
||||
log.runtime(f"Registering {self} for role `{self.name}`")
|
||||
assert isinstance(self._arb_addr, tuple)
|
||||
|
||||
async with get_arbiter(*self._arb_addr) as arb_portal:
|
||||
|
@ -991,13 +993,13 @@ class Actor:
|
|||
if any(
|
||||
chan.connected() for chan in chain(*self._peers.values())
|
||||
):
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Waiting for remaining peers {self._peers} to clear")
|
||||
with trio.CancelScope(shield=True):
|
||||
await self._no_more_peers.wait()
|
||||
log.debug("All peer channels are complete")
|
||||
log.runtime("All peer channels are complete")
|
||||
|
||||
log.debug("Runtime completed")
|
||||
log.runtime("Runtime completed")
|
||||
|
||||
async def _serve_forever(
|
||||
self,
|
||||
|
@ -1027,7 +1029,7 @@ class Actor:
|
|||
host=accept_host,
|
||||
)
|
||||
)
|
||||
log.debug(
|
||||
log.runtime(
|
||||
"Started tcp server(s) on"
|
||||
f" {[getattr(l, 'socket', 'unknown socket') for l in l]}")
|
||||
self._listeners.extend(l)
|
||||
|
@ -1066,7 +1068,7 @@ class Actor:
|
|||
# with the root actor in this tree
|
||||
dbcs = _debug._debugger_request_cs
|
||||
if dbcs is not None:
|
||||
log.debug("Cancelling active debugger request")
|
||||
log.pdb("Cancelling active debugger request")
|
||||
dbcs.cancel()
|
||||
|
||||
# kill all ongoing tasks
|
||||
|
@ -1110,7 +1112,7 @@ class Actor:
|
|||
log.warning(f"{cid} has already completed/terminated?")
|
||||
return
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
|
||||
f"peer: {chan.uid}\n")
|
||||
|
||||
|
@ -1122,12 +1124,12 @@ class Actor:
|
|||
scope.cancel()
|
||||
|
||||
# wait for _invoke to mark the task complete
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
|
||||
f"peer: {chan.uid}\n")
|
||||
await is_complete.wait()
|
||||
|
||||
log.debug(
|
||||
log.runtime(
|
||||
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
|
||||
f"peer: {chan.uid}\n")
|
||||
|
||||
|
@ -1157,7 +1159,7 @@ class Actor:
|
|||
preventing any new inbound connections from being established.
|
||||
"""
|
||||
if self._server_n:
|
||||
log.debug("Shutting down channel server")
|
||||
log.runtime("Shutting down channel server")
|
||||
self._server_n.cancel_scope.cancel()
|
||||
|
||||
@property
|
||||
|
|
Loading…
Reference in New Issue