Compare commits
11 Commits
2bb33da9c8
...
48b6db5c68
Author | SHA1 | Date |
---|---|---|
|
48b6db5c68 | |
|
029888cee8 | |
|
223d885e22 | |
|
a1f091882e | |
|
e587f0da23 | |
|
5138224625 | |
|
ad72cd629f | |
|
533e69baaf | |
|
fbc9325184 | |
|
3cd222959a | |
|
2ea703cc75 |
|
@ -0,0 +1,4 @@
|
|||
'''
|
||||
`tractor.ipc` subsystem(s)/unit testing suites.
|
||||
|
||||
'''
|
|
@ -0,0 +1,72 @@
|
|||
'''
|
||||
High-level `.ipc._server` unit tests.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
from tractor import (
|
||||
devx,
|
||||
ipc,
|
||||
log,
|
||||
)
|
||||
from tractor._testing.addr import (
|
||||
get_rando_addr,
|
||||
)
|
||||
# TODO, use/check-roundtripping with some of these wrapper types?
|
||||
#
|
||||
# from .._addr import Address
|
||||
# from ._chan import Channel
|
||||
# from ._transport import MsgTransport
|
||||
# from ._uds import UDSAddress
|
||||
# from ._tcp import TCPAddress
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'_tpt_proto',
|
||||
['uds', 'tcp']
|
||||
)
|
||||
def test_basic_ipc_server(
|
||||
_tpt_proto: str,
|
||||
debug_mode: bool,
|
||||
loglevel: str,
|
||||
):
|
||||
|
||||
# so we see the socket-listener reporting on console
|
||||
log.get_console_log("INFO")
|
||||
|
||||
rando_addr: tuple = get_rando_addr(
|
||||
tpt_proto=_tpt_proto,
|
||||
)
|
||||
async def main():
|
||||
async with ipc._server.open_ipc_server() as server:
|
||||
|
||||
assert (
|
||||
server._parent_tn
|
||||
and
|
||||
server._parent_tn is server._stream_handler_tn
|
||||
)
|
||||
assert server._no_more_peers.is_set()
|
||||
|
||||
eps: list[ipc.IPCEndpoint] = await server.listen_on(
|
||||
accept_addrs=[rando_addr],
|
||||
stream_handler_nursery=None,
|
||||
)
|
||||
assert (
|
||||
len(eps) == 1
|
||||
and
|
||||
(ep := eps[0])._listener
|
||||
and
|
||||
not ep.peer_tpts
|
||||
)
|
||||
|
||||
server._parent_tn.cancel_scope.cancel()
|
||||
|
||||
# !TODO! actually make a bg-task connection from a client
|
||||
# using `ipc._chan._connect_chan()`
|
||||
|
||||
with devx.maybe_open_crash_handler(
|
||||
pdb=debug_mode,
|
||||
):
|
||||
trio.run(main)
|
|
@ -582,8 +582,7 @@ async def open_portal(
|
|||
msg_loop_cs = await tn.start(
|
||||
partial(
|
||||
_rpc.process_messages,
|
||||
actor,
|
||||
channel,
|
||||
chan=channel,
|
||||
# if the local task is cancelled we want to keep
|
||||
# the msg loop running until our block ends
|
||||
shield=True,
|
||||
|
|
|
@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
|
|||
|
||||
|
||||
async def process_messages(
|
||||
actor: Actor,
|
||||
chan: Channel,
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
@ -907,6 +906,7 @@ async def process_messages(
|
|||
(as utilized inside `Portal.cancel_actor()` ).
|
||||
|
||||
'''
|
||||
actor: Actor = _state.current_actor()
|
||||
assert actor._service_n # runtime state sanity
|
||||
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
|
|
|
@ -1262,6 +1262,10 @@ async def async_main(
|
|||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||
|
||||
'''
|
||||
# XXX NOTE, `_state._current_actor` **must** be set prior to
|
||||
# calling this core runtime entrypoint!
|
||||
assert actor is _state.current_actor()
|
||||
|
||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
|
@ -1321,7 +1325,6 @@ async def async_main(
|
|||
) as service_nursery,
|
||||
|
||||
_server.open_ipc_server(
|
||||
actor=actor,
|
||||
parent_tn=service_nursery,
|
||||
stream_handler_tn=service_nursery,
|
||||
) as ipc_server,
|
||||
|
@ -1375,7 +1378,6 @@ async def async_main(
|
|||
'Booting IPC server'
|
||||
)
|
||||
eps: list = await ipc_server.listen_on(
|
||||
actor=actor,
|
||||
accept_addrs=accept_addrs,
|
||||
stream_handler_nursery=service_nursery,
|
||||
)
|
||||
|
@ -1460,8 +1462,7 @@ async def async_main(
|
|||
await root_nursery.start(
|
||||
partial(
|
||||
_rpc.process_messages,
|
||||
actor,
|
||||
actor._parent_chan,
|
||||
chan=actor._parent_chan,
|
||||
shield=True,
|
||||
)
|
||||
)
|
||||
|
|
|
@ -72,11 +72,223 @@ if TYPE_CHECKING:
|
|||
log = log.get_logger(__name__)
|
||||
|
||||
|
||||
async def maybe_wait_on_canced_subs(
|
||||
uid: tuple[str, str],
|
||||
chan: Channel,
|
||||
disconnected: bool,
|
||||
|
||||
actor: Actor|None = None,
|
||||
chan_drain_timeout: float = 0.5,
|
||||
an_exit_timeout: float = 0.5,
|
||||
|
||||
) -> ActorNursery|None:
|
||||
'''
|
||||
When a process-local actor-nursery is found for the given actor
|
||||
`uid` (i.e. that peer is **also** a subactor of this parent), we
|
||||
attempt to (with timeouts) wait on,
|
||||
|
||||
- all IPC msgs to drain on the (common) `Channel` such that all
|
||||
local `Context`-parent-tasks can also gracefully collect
|
||||
`ContextCancelled` msgs from their respective remote children
|
||||
vs. a `chan_drain_timeout`.
|
||||
|
||||
- the actor-nursery to cancel-n-join all its supervised children
|
||||
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
|
||||
detect cases where the IPC transport connection broke but
|
||||
a sub-process is detected as still alive (a case that happens
|
||||
when the subactor is still in an active debugger REPL session).
|
||||
|
||||
If the timeout expires in either case we ofc report with warning.
|
||||
|
||||
'''
|
||||
actor = actor or _state.current_actor()
|
||||
|
||||
# XXX running outside actor-runtime usage,
|
||||
# - unit testing
|
||||
# - possibly manual usage (eventually) ?
|
||||
if not actor:
|
||||
return None
|
||||
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = actor._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in `Portal.cancel_actor()`. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
and (
|
||||
actor._cancel_called
|
||||
or
|
||||
chan._cancel_called
|
||||
)
|
||||
#
|
||||
# ^-TODO-^ along with this is there another condition
|
||||
# that we should filter with to avoid entering this
|
||||
# waiting block needlessly?
|
||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||
# only if the `._children` table is empty or has
|
||||
# only `Portal`s with .chan._cancel_called ==
|
||||
# True` as per what we had below; the MAIN DIFF
|
||||
# BEING that just bc one `Portal.cancel_actor()`
|
||||
# was called, doesn't mean the whole actor-nurse
|
||||
# is gonna exit any time soon right!?
|
||||
#
|
||||
# or
|
||||
# all(chan._cancel_called for chan in chans)
|
||||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
# which is mapped to a sub-actor (i.e. it's managed
|
||||
# by local actor-nursery) has a message that is sent
|
||||
# to the peer likely by this actor (which may be in
|
||||
# a shutdown sequence due to cancellation) when the
|
||||
# local runtime here is now cancelled while
|
||||
# (presumably) in the middle of msg loop processing.
|
||||
chan_info: str = (
|
||||
f'{chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f' |_{chan.transport}\n\n'
|
||||
)
|
||||
with trio.move_on_after(chan_drain_timeout) as drain_cs:
|
||||
drain_cs.shield = True
|
||||
|
||||
# attempt to wait for the far end to close the
|
||||
# channel and bail after timeout (a 2-generals
|
||||
# problem on closure).
|
||||
assert chan.transport
|
||||
async for msg in chan.transport.drain():
|
||||
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
# This accomplishes deterministic
|
||||
# ``Portal.cancel_actor()`` cancellation by
|
||||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await actor._deliver_ctx_payload(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
if drain_cs.cancelled_caught:
|
||||
log.warning(
|
||||
'Timed out waiting on IPC transport channel to drain?\n'
|
||||
f'{chan_info}'
|
||||
)
|
||||
|
||||
# XXX NOTE XXX when no explicit call to
|
||||
# `open_root_actor()` was made by the application
|
||||
# (normally we implicitly make that call inside
|
||||
# the first `.open_nursery()` in root-actor
|
||||
# user/app code), we can assume that either we
|
||||
# are NOT the root actor or are root but the
|
||||
# runtime was started manually. and thus DO have
|
||||
# to wait for the nursery-enterer to exit before
|
||||
# shutting down the local runtime to avoid
|
||||
# clobbering any ongoing subactor
|
||||
# teardown/debugging/graceful-cancel.
|
||||
#
|
||||
# see matching note inside `._supervise.open_nursery()`
|
||||
#
|
||||
# TODO: should we have a separate cs + timeout
|
||||
# block here?
|
||||
if (
|
||||
# XXX SO either,
|
||||
# - not root OR,
|
||||
# - is root but `open_root_actor()` was
|
||||
# entered manually (in which case we do
|
||||
# the equiv wait there using the
|
||||
# `devx.debug` sub-sys APIs).
|
||||
not local_nursery._implicit_runtime_started
|
||||
):
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
|
||||
an_exit_cs.shield = True
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
# TODO: currently this is always triggering for every
|
||||
# sub-daemon spawned from the `piker.services._mngr`?
|
||||
# -[ ] how do we ensure that the IPC is supposed to
|
||||
# be long lived and isn't just a register?
|
||||
# |_ in the register case how can we signal that the
|
||||
# ephemeral msg loop was intentional?
|
||||
if (
|
||||
# not local_nursery._implicit_runtime_started
|
||||
# and
|
||||
an_exit_cs.cancelled_caught
|
||||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
if disconnected:
|
||||
# if the transport died and this actor is still
|
||||
# registered within a local nursery, we report
|
||||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
return local_nursery
|
||||
|
||||
# TODO multi-tpt support with per-proto peer tracking?
|
||||
#
|
||||
# -[x] maybe change to mod-func and rename for implied
|
||||
# multi-transport semantics?
|
||||
#
|
||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||
# so that we can query per tpt all peer contact infos?
|
||||
# |_[ ] possibly provide a global viewing via a
|
||||
|
@ -87,7 +299,6 @@ async def handle_stream_from_peer(
|
|||
|
||||
*,
|
||||
server: IPCServer,
|
||||
actor: Actor,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -119,9 +330,10 @@ async def handle_stream_from_peer(
|
|||
|
||||
# initial handshake with peer phase
|
||||
try:
|
||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||
aid=actor.aid,
|
||||
)
|
||||
if actor := _state.current_actor():
|
||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||
aid=actor.aid,
|
||||
)
|
||||
except (
|
||||
TransportClosed,
|
||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||
|
@ -222,8 +434,7 @@ async def handle_stream_from_peer(
|
|||
disconnected,
|
||||
last_msg,
|
||||
) = await _rpc.process_messages(
|
||||
actor,
|
||||
chan,
|
||||
chan=chan,
|
||||
)
|
||||
except trio.Cancelled:
|
||||
log.cancel(
|
||||
|
@ -234,179 +445,16 @@ async def handle_stream_from_peer(
|
|||
raise
|
||||
|
||||
finally:
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = actor._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in ``Portal.cancel_actor()``. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
and (
|
||||
actor._cancel_called
|
||||
or
|
||||
chan._cancel_called
|
||||
)
|
||||
#
|
||||
# ^-TODO-^ along with this is there another condition
|
||||
# that we should filter with to avoid entering this
|
||||
# waiting block needlessly?
|
||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||
# only if the `._children` table is empty or has
|
||||
# only `Portal`s with .chan._cancel_called ==
|
||||
# True` as per what we had below; the MAIN DIFF
|
||||
# BEING that just bc one `Portal.cancel_actor()`
|
||||
# was called, doesn't mean the whole actor-nurse
|
||||
# is gonna exit any time soon right!?
|
||||
#
|
||||
# or
|
||||
# all(chan._cancel_called for chan in chans)
|
||||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
# which is mapped to a sub-actor (i.e. it's managed
|
||||
# by local actor-nursery) has a message that is sent
|
||||
# to the peer likely by this actor (which may be in
|
||||
# a shutdown sequence due to cancellation) when the
|
||||
# local runtime here is now cancelled while
|
||||
# (presumably) in the middle of msg loop processing.
|
||||
chan_info: str = (
|
||||
f'{chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f' |_{chan.transport}\n\n'
|
||||
)
|
||||
with trio.move_on_after(0.5) as drain_cs:
|
||||
drain_cs.shield = True
|
||||
|
||||
# attempt to wait for the far end to close the
|
||||
# channel and bail after timeout (a 2-generals
|
||||
# problem on closure).
|
||||
assert chan.transport
|
||||
async for msg in chan.transport.drain():
|
||||
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
# This accomplishes deterministic
|
||||
# ``Portal.cancel_actor()`` cancellation by
|
||||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await actor._deliver_ctx_payload(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
if drain_cs.cancelled_caught:
|
||||
log.warning(
|
||||
'Timed out waiting on IPC transport channel to drain?\n'
|
||||
f'{chan_info}'
|
||||
)
|
||||
|
||||
# XXX NOTE XXX when no explicit call to
|
||||
# `open_root_actor()` was made by the application
|
||||
# (normally we implicitly make that call inside
|
||||
# the first `.open_nursery()` in root-actor
|
||||
# user/app code), we can assume that either we
|
||||
# are NOT the root actor or are root but the
|
||||
# runtime was started manually. and thus DO have
|
||||
# to wait for the nursery-enterer to exit before
|
||||
# shutting down the local runtime to avoid
|
||||
# clobbering any ongoing subactor
|
||||
# teardown/debugging/graceful-cancel.
|
||||
#
|
||||
# see matching note inside `._supervise.open_nursery()`
|
||||
#
|
||||
# TODO: should we have a separate cs + timeout
|
||||
# block here?
|
||||
if (
|
||||
# XXX SO either,
|
||||
# - not root OR,
|
||||
# - is root but `open_root_actor()` was
|
||||
# entered manually (in which case we do
|
||||
# the equiv wait there using the
|
||||
# `devx._debug` sub-sys APIs).
|
||||
not local_nursery._implicit_runtime_started
|
||||
):
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
with trio.move_on_after(0.5) as an_exit_cs:
|
||||
an_exit_cs.shield = True
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
# TODO: currently this is always triggering for every
|
||||
# sub-daemon spawned from the `piker.services._mngr`?
|
||||
# -[ ] how do we ensure that the IPC is supposed to
|
||||
# be long lived and isn't just a register?
|
||||
# |_ in the register case how can we signal that the
|
||||
# ephemeral msg loop was intentional?
|
||||
if (
|
||||
# not local_nursery._implicit_runtime_started
|
||||
# and
|
||||
an_exit_cs.cancelled_caught
|
||||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
if disconnected:
|
||||
# if the transport died and this actor is still
|
||||
# registered within a local nursery, we report
|
||||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
# check if there are subs which we should gracefully join at
|
||||
# both the inter-actor-task and subprocess levels to
|
||||
# gracefully remote cancel and later disconnect (particularly
|
||||
# for permitting subs engaged in active debug-REPL sessions).
|
||||
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
|
||||
uid=uid,
|
||||
chan=chan,
|
||||
disconnected=disconnected,
|
||||
)
|
||||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
|
@ -467,11 +515,11 @@ async def handle_stream_from_peer(
|
|||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
if (
|
||||
local_nursery
|
||||
and
|
||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||
and
|
||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||
and
|
||||
local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
|
@ -804,7 +852,6 @@ class IPCServer(Struct):
|
|||
async def listen_on(
|
||||
self,
|
||||
*,
|
||||
actor: Actor,
|
||||
accept_addrs: list[tuple[str, int|str]]|None = None,
|
||||
stream_handler_nursery: Nursery|None = None,
|
||||
) -> list[IPCEndpoint]:
|
||||
|
@ -837,20 +884,19 @@ class IPCServer(Struct):
|
|||
f'{self}\n'
|
||||
)
|
||||
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Binding to endpoints for,\n'
|
||||
f'{accept_addrs}\n'
|
||||
)
|
||||
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
||||
partial(
|
||||
_serve_ipc_eps,
|
||||
actor=actor,
|
||||
server=self,
|
||||
stream_handler_tn=stream_handler_nursery,
|
||||
listen_addrs=accept_addrs,
|
||||
)
|
||||
)
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
@ -873,7 +919,6 @@ class IPCServer(Struct):
|
|||
|
||||
async def _serve_ipc_eps(
|
||||
*,
|
||||
actor: Actor,
|
||||
server: IPCServer,
|
||||
stream_handler_tn: Nursery,
|
||||
listen_addrs: list[tuple[str, int|str]],
|
||||
|
@ -907,12 +952,13 @@ async def _serve_ipc_eps(
|
|||
stream_handler_tn=stream_handler_tn,
|
||||
)
|
||||
try:
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Starting new endpoint listener\n'
|
||||
f'{ep}\n'
|
||||
)
|
||||
listener: trio.abc.Listener = await ep.start_listener()
|
||||
assert listener is ep._listener
|
||||
# actor = _state.current_actor()
|
||||
# if actor.is_registry:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
|
@ -937,7 +983,6 @@ async def _serve_ipc_eps(
|
|||
handler=partial(
|
||||
handle_stream_from_peer,
|
||||
server=server,
|
||||
actor=actor,
|
||||
),
|
||||
listeners=listeners,
|
||||
|
||||
|
@ -948,13 +993,13 @@ async def _serve_ipc_eps(
|
|||
)
|
||||
)
|
||||
# TODO, wow make this message better! XD
|
||||
log.info(
|
||||
log.runtime(
|
||||
'Started server(s)\n'
|
||||
+
|
||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||
)
|
||||
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
@ -970,6 +1015,7 @@ async def _serve_ipc_eps(
|
|||
ep.close_listener()
|
||||
server._endpoints.remove(ep)
|
||||
|
||||
# actor = _state.current_actor()
|
||||
# if actor.is_arbiter:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
|
@ -980,7 +1026,6 @@ async def _serve_ipc_eps(
|
|||
|
||||
@acm
|
||||
async def open_ipc_server(
|
||||
actor: Actor,
|
||||
parent_tn: Nursery|None = None,
|
||||
stream_handler_tn: Nursery|None = None,
|
||||
|
||||
|
|
|
@ -127,6 +127,11 @@ async def start_listener(
|
|||
Start a TCP socket listener on the given `TCPAddress`.
|
||||
|
||||
'''
|
||||
log.info(
|
||||
f'Attempting to bind TCP socket\n'
|
||||
f'>[\n'
|
||||
f'|_{addr}\n'
|
||||
)
|
||||
# ?TODO, maybe we should just change the lower-level call this is
|
||||
# using internall per-listener?
|
||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||
|
@ -140,6 +145,12 @@ async def start_listener(
|
|||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
host, port = listener.socket.getsockname()[:2]
|
||||
|
||||
log.info(
|
||||
f'Listening on TCP socket\n'
|
||||
f'[>\n'
|
||||
f' |_{addr}\n'
|
||||
)
|
||||
return listener
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue