Compare commits
4 Commits
29db08b370
...
be2f4f306e
Author | SHA1 | Date |
---|---|---|
|
be2f4f306e | |
|
65ae2dc67c | |
|
4be499fb1f | |
|
7317bb269c |
|
@ -0,0 +1,360 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
A shm-thread-as-generator-fn-ctx **prototype** in anticipation of
|
||||||
|
free-threading (aka GIL-less threads) in py 3.13+
|
||||||
|
|
||||||
|
Bo
|
||||||
|
|
||||||
|
Main rationale,
|
||||||
|
- binding a bg-thread to a "suspendable fn scope" means avoiding any
|
||||||
|
locking around shm-data-structures and, except for a single
|
||||||
|
`threading.Condition` (or better) for thread-context-switching,
|
||||||
|
enables a "pure-ish" style semantic for inter-thread
|
||||||
|
value-passing-IO between a "parent" and (bg) "child" shm-thread
|
||||||
|
where the only (allowed) data-flow would be via immutable values in
|
||||||
|
a "coroutine-style" using,
|
||||||
|
|
||||||
|
- parent-side:
|
||||||
|
|_ `callee_sent = gen.send(caller_sent)`
|
||||||
|
|
||||||
|
- child-side:
|
||||||
|
|_ `caller_sent = yield callee_sent`
|
||||||
|
|
||||||
|
Related (official) reading,
|
||||||
|
- https://docs.python.org/3/glossary.html#term-free-threading
|
||||||
|
- https://peps.python.org/pep-0703/
|
||||||
|
|_https://peps.python.org/pep-0703/#optimistic-avoiding-locking-in-dict-and-list-accesses
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
from contextlib import (
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
|
import inspect
|
||||||
|
# from functools import partial
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Generator,
|
||||||
|
)
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
log = tractor.log.get_console_log(
|
||||||
|
'info',
|
||||||
|
# ^ XXX causes latency with seed>=1e3
|
||||||
|
# 'warning',
|
||||||
|
)
|
||||||
|
_seed: int = int(1e3)
|
||||||
|
|
||||||
|
|
||||||
|
def thread_gen(seed: int):
|
||||||
|
thr = threading.current_thread()
|
||||||
|
log.info(
|
||||||
|
f'thr: {thr.name} @ {thr.ident}\n'
|
||||||
|
f' |_{thr!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'IN `thread_gen(seed={seed})`\n'
|
||||||
|
f'\n'
|
||||||
|
f'Starting range()-loop\n'
|
||||||
|
)
|
||||||
|
for i in range(seed):
|
||||||
|
log.info(
|
||||||
|
f'yielding i={i}\n'
|
||||||
|
)
|
||||||
|
from_main = yield i
|
||||||
|
log.info(
|
||||||
|
f'(from_main := {from_main}) = yield (i:={i})\n'
|
||||||
|
)
|
||||||
|
# time.sleep(0.0001)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO, how would we get the equiv from a pub trio-API?
|
||||||
|
# -[ ] what about an inter-thread channel much like we have for
|
||||||
|
# `to_asyncio` & guest mode??
|
||||||
|
#
|
||||||
|
# async def spawn_bg_thread_running_gen(fn):
|
||||||
|
# log.info('running trio.to_thread.run_sync()')
|
||||||
|
# await trio.to_thread.run_sync(
|
||||||
|
# partial(
|
||||||
|
# run_gen_in_thread,
|
||||||
|
# fn=fn,
|
||||||
|
# seed=_seed,
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
|
# ?TODO? once correct, wrap this as a @deco-API?
|
||||||
|
# -[ ] @generator_thread or similar?
|
||||||
|
#
|
||||||
|
def run_gen_in_thread(
|
||||||
|
cond: threading.Condition,
|
||||||
|
gen: Generator,
|
||||||
|
# ^NOTE, already closure-bound-in tgt generator-fn-instance which
|
||||||
|
# will be yielded to in the bg-thread!
|
||||||
|
):
|
||||||
|
thr: threading.Thread = threading.current_thread()
|
||||||
|
log.info(
|
||||||
|
f'thr: {thr.name} @ {thr.ident}\n'
|
||||||
|
f' |_{thr!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'IN `run_gen_in_thread(gen={gen})`\n'
|
||||||
|
f'\n'
|
||||||
|
f'entering gen blocking: {gen!r}\n'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
log.runtime('locking cond..')
|
||||||
|
with cond:
|
||||||
|
log.runtime('LOCKED cond..')
|
||||||
|
first_yielded = gen.send(None)
|
||||||
|
assert cond.to_yield is None
|
||||||
|
cond.to_yield = first_yielded
|
||||||
|
log.runtime('notifying cond..')
|
||||||
|
cond.notify()
|
||||||
|
log.runtime('waiting cond..')
|
||||||
|
cond.wait()
|
||||||
|
|
||||||
|
while (to_send := cond.to_send) is not None:
|
||||||
|
try:
|
||||||
|
yielded = gen.send(to_send)
|
||||||
|
except StopIteration as siter:
|
||||||
|
# TODO, check for return value?
|
||||||
|
# if (ret := siter.value):
|
||||||
|
# cond.to_return = ret
|
||||||
|
assert siter
|
||||||
|
log.exception(f'{gen} exited')
|
||||||
|
raise
|
||||||
|
|
||||||
|
cond.to_yield = yielded
|
||||||
|
log.runtime('LOOP notifying cond..')
|
||||||
|
cond.notify()
|
||||||
|
log.runtime('LOOP waiting cond..')
|
||||||
|
cond.wait()
|
||||||
|
|
||||||
|
# out = (yield from gen)
|
||||||
|
log.runtime('RELEASE-ing cond..')
|
||||||
|
|
||||||
|
# with cond block-end
|
||||||
|
log.runtime('RELEASE-ed cond..')
|
||||||
|
|
||||||
|
except BaseException:
|
||||||
|
log.exception(f'exited gen: {gen!r}\n')
|
||||||
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
log.warning(
|
||||||
|
'Exiting bg thread!\n'
|
||||||
|
)
|
||||||
|
# TODO! better then this null setting naivety!
|
||||||
|
# -[ ] maybe an Unresolved or similar like for our `Context`?
|
||||||
|
#
|
||||||
|
# apply sentinel
|
||||||
|
cond.to_yield = None
|
||||||
|
with cond:
|
||||||
|
cond.notify_all()
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def start_in_bg_thread(
|
||||||
|
gen: Generator,
|
||||||
|
|
||||||
|
# ?TODO?, is this useful to pass startup-ctx to the thread?
|
||||||
|
name: str|None = None,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> tuple[
|
||||||
|
threading.Thread,
|
||||||
|
Generator,
|
||||||
|
Any,
|
||||||
|
]:
|
||||||
|
if not inspect.isgenerator(gen):
|
||||||
|
raise ValueError(
|
||||||
|
f'You must pass a `gen: Generator` instance\n'
|
||||||
|
f'gen={gen!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ?TODO? wrap this stuff into some kinda
|
||||||
|
# single-entry-inter-thread mem-chan?
|
||||||
|
#
|
||||||
|
cond = threading.Condition()
|
||||||
|
cond.to_send = None
|
||||||
|
cond.to_yield = None
|
||||||
|
cond.to_return = None
|
||||||
|
|
||||||
|
thr = threading.Thread(
|
||||||
|
target=run_gen_in_thread,
|
||||||
|
# args=(), # ?TODO, useful?
|
||||||
|
kwargs={
|
||||||
|
'cond': cond,
|
||||||
|
'gen': gen,
|
||||||
|
} | kwargs,
|
||||||
|
name=name or gen.__name__,
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
f'starting bg thread\n'
|
||||||
|
f'>(\n'
|
||||||
|
f'|_{thr!r}\n'
|
||||||
|
)
|
||||||
|
thr.start()
|
||||||
|
|
||||||
|
# TODO, Event or cond.wait() here to sync!?
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
try:
|
||||||
|
log.info(f'locking cond {cond}..')
|
||||||
|
with cond:
|
||||||
|
log.runtime(f'LOCKED cond {cond}..')
|
||||||
|
first_yielded = cond.to_yield
|
||||||
|
log.runtime(f'cond.to_yield: {first_yielded}')
|
||||||
|
|
||||||
|
# delegator shim generator which proxies values from
|
||||||
|
# caller to callee-in-bg-thread
|
||||||
|
def wrapper():
|
||||||
|
|
||||||
|
# !?TODO, minimize # of yields during startup?
|
||||||
|
# -[ ] we can do i in <=1 manual yield pre while-loop no?
|
||||||
|
#
|
||||||
|
first_sent = yield first_yielded
|
||||||
|
cond.to_send = first_sent
|
||||||
|
|
||||||
|
# !TODO, exactly why we need a conditional-emit-sys!
|
||||||
|
log.runtime(
|
||||||
|
f'cond.notify()\n'
|
||||||
|
f'cond.to_send={cond.to_send!r}\n'
|
||||||
|
f'cond.to_yield={cond.to_yield!r}\n'
|
||||||
|
)
|
||||||
|
cond.notify()
|
||||||
|
log.runtime(
|
||||||
|
f'cond.wait()\n'
|
||||||
|
f'cond.to_send={cond.to_send!r}\n'
|
||||||
|
f'cond.to_yield={cond.to_yield!r}\n'
|
||||||
|
)
|
||||||
|
cond.wait()
|
||||||
|
|
||||||
|
to_yield = cond.to_yield
|
||||||
|
log.runtime(
|
||||||
|
f'yielding to caller\n'
|
||||||
|
f'cond.to_send={cond.to_send!r}\n'
|
||||||
|
f'cond.to_yield={cond.to_yield!r}\n'
|
||||||
|
)
|
||||||
|
to_send = yield to_yield
|
||||||
|
log.runtime(
|
||||||
|
f'post-yield to caller\n'
|
||||||
|
f'to_send={to_send!r}\n'
|
||||||
|
f'to_yield={to_yield!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# !TODO, proper sentinel-to-break type-condition!
|
||||||
|
while to_send is not None:
|
||||||
|
cond.to_send = to_send
|
||||||
|
log.runtime(
|
||||||
|
f'cond.nofity()\n'
|
||||||
|
f'cond.to_send={cond.to_send!r}\n'
|
||||||
|
f'cond.to_yield={cond.to_yield!r}\n'
|
||||||
|
)
|
||||||
|
cond.notify()
|
||||||
|
if cond.to_yield is None:
|
||||||
|
log.runtime(
|
||||||
|
'BREAKING from wrapper-LOOP!\n'
|
||||||
|
)
|
||||||
|
break
|
||||||
|
return
|
||||||
|
|
||||||
|
log.runtime(
|
||||||
|
f'cond.wait()\n'
|
||||||
|
f'cond.to_send={cond.to_send!r}\n'
|
||||||
|
f'cond.to_yield={cond.to_yield!r}\n'
|
||||||
|
)
|
||||||
|
cond.wait()
|
||||||
|
|
||||||
|
log.runtime(
|
||||||
|
f'yielding to caller\n'
|
||||||
|
f'cond.to_send={cond.to_send!r}\n'
|
||||||
|
f'cond.to_yield={cond.to_yield!r}\n'
|
||||||
|
)
|
||||||
|
to_yield = cond.to_yield
|
||||||
|
to_send = yield to_yield
|
||||||
|
log.runtime(
|
||||||
|
f'post-yield to caller\n'
|
||||||
|
f'to_send={to_send!r}\n'
|
||||||
|
f'to_yield={to_yield!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info('creating wrapper..')
|
||||||
|
wrapper_gen = wrapper()
|
||||||
|
log.info(f'first .send(None): {wrapper_gen}\n')
|
||||||
|
first_yielded = wrapper_gen.send(None)
|
||||||
|
log.info(f'first yielded: {first_yielded}\n')
|
||||||
|
|
||||||
|
yield (
|
||||||
|
thr,
|
||||||
|
wrapper_gen,
|
||||||
|
first_yielded,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
thr.join()
|
||||||
|
log.info(f'bg thread joined: {thr!r}')
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
assert tn
|
||||||
|
|
||||||
|
with (
|
||||||
|
start_in_bg_thread(
|
||||||
|
gen=(
|
||||||
|
_gen:=thread_gen(
|
||||||
|
seed=_seed,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
) as (
|
||||||
|
thr,
|
||||||
|
wrapped_gen,
|
||||||
|
first,
|
||||||
|
),
|
||||||
|
):
|
||||||
|
assert (
|
||||||
|
_gen is not wrapped_gen
|
||||||
|
and
|
||||||
|
wrapped_gen is not None
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
'Entering wrapped_gen loop\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE, like our `Context.started` value
|
||||||
|
assert first == 0
|
||||||
|
|
||||||
|
# !TODO, proper sentinel-to-break type-condition!
|
||||||
|
yielded = first
|
||||||
|
while yielded is not None:
|
||||||
|
|
||||||
|
# XXX, compute callers new value to send to bg-thread
|
||||||
|
to_send = yielded * yielded
|
||||||
|
|
||||||
|
# send to bg-thread
|
||||||
|
yielded = wrapped_gen.send(to_send)
|
||||||
|
log.info(
|
||||||
|
f'(yielded:={yielded!r}) = wrapped_gen.send((to_send:={to_send!r})'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -116,9 +116,11 @@ def test_shield_pause(
|
||||||
child.pid,
|
child.pid,
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
)
|
)
|
||||||
|
from tractor._supervise import _shutdown_msg
|
||||||
expect(
|
expect(
|
||||||
child,
|
child,
|
||||||
'Shutting down actor runtime',
|
# 'Shutting down actor runtime',
|
||||||
|
_shutdown_msg,
|
||||||
timeout=6,
|
timeout=6,
|
||||||
)
|
)
|
||||||
assert_before(
|
assert_before(
|
||||||
|
|
|
@ -118,6 +118,10 @@ class Portal:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def chan(self) -> Channel:
|
def chan(self) -> Channel:
|
||||||
|
'''
|
||||||
|
Ref to this ctx's underlying `tractor.ipc.Channel`.
|
||||||
|
|
||||||
|
'''
|
||||||
return self._chan
|
return self._chan
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -177,10 +181,17 @@ class Portal:
|
||||||
|
|
||||||
# not expecting a "main" result
|
# not expecting a "main" result
|
||||||
if self._expect_result_ctx is None:
|
if self._expect_result_ctx is None:
|
||||||
|
peer_id: str = f'{self.channel.aid.reprol()!r}'
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Portal for {self.channel.aid} not expecting a final"
|
f'Portal to peer {peer_id} will not deliver a final result?\n'
|
||||||
" result?\nresult() should only be called if subactor"
|
f'\n'
|
||||||
" was spawned with `ActorNursery.run_in_actor()`")
|
f'Context.result() can only be called by the parent of '
|
||||||
|
f'a sub-actor when it was spawned with '
|
||||||
|
f'`ActorNursery.run_in_actor()`'
|
||||||
|
f'\n'
|
||||||
|
f'Further this `ActorNursery`-method-API will deprecated in the'
|
||||||
|
f'near fututre!\n'
|
||||||
|
)
|
||||||
return NoResult
|
return NoResult
|
||||||
|
|
||||||
# expecting a "main" result
|
# expecting a "main" result
|
||||||
|
@ -213,6 +224,7 @@ class Portal:
|
||||||
typname: str = type(self).__name__
|
typname: str = type(self).__name__
|
||||||
log.warning(
|
log.warning(
|
||||||
f'`{typname}.result()` is DEPRECATED!\n'
|
f'`{typname}.result()` is DEPRECATED!\n'
|
||||||
|
f'\n'
|
||||||
f'Use `{typname}.wait_for_result()` instead!\n'
|
f'Use `{typname}.wait_for_result()` instead!\n'
|
||||||
)
|
)
|
||||||
return await self.wait_for_result(
|
return await self.wait_for_result(
|
||||||
|
@ -224,8 +236,10 @@ class Portal:
|
||||||
# terminate all locally running async generator
|
# terminate all locally running async generator
|
||||||
# IPC calls
|
# IPC calls
|
||||||
if self._streams:
|
if self._streams:
|
||||||
log.cancel(
|
peer_id: str = f'{self.channel.aid.reprol()!r}'
|
||||||
f"Cancelling all streams with {self.channel.aid}")
|
report: str = (
|
||||||
|
f'Cancelling all msg-streams with {peer_id}\n'
|
||||||
|
)
|
||||||
for stream in self._streams.copy():
|
for stream in self._streams.copy():
|
||||||
try:
|
try:
|
||||||
await stream.aclose()
|
await stream.aclose()
|
||||||
|
@ -234,10 +248,18 @@ class Portal:
|
||||||
# (unless of course at some point down the road we
|
# (unless of course at some point down the road we
|
||||||
# won't expect this to always be the case or need to
|
# won't expect this to always be the case or need to
|
||||||
# detect it for respawning purposes?)
|
# detect it for respawning purposes?)
|
||||||
log.debug(f"{stream} was already closed.")
|
report += (
|
||||||
|
f'->) {stream!r} already closed\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
log.cancel(report)
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
log.debug(f"Closing {self}")
|
log.debug(
|
||||||
|
f'Closing portal\n'
|
||||||
|
f'>}}\n'
|
||||||
|
f'|_{self}\n'
|
||||||
|
)
|
||||||
# TODO: once we move to implementing our own `ReceiveChannel`
|
# TODO: once we move to implementing our own `ReceiveChannel`
|
||||||
# (including remote task cancellation inside its `.aclose()`)
|
# (including remote task cancellation inside its `.aclose()`)
|
||||||
# we'll need to .aclose all those channels here
|
# we'll need to .aclose all those channels here
|
||||||
|
@ -263,19 +285,18 @@ class Portal:
|
||||||
__runtimeframe__: int = 1 # noqa
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
|
||||||
chan: Channel = self.channel
|
chan: Channel = self.channel
|
||||||
|
peer_id: str = f'{self.channel.aid.reprol()!r}'
|
||||||
if not chan.connected():
|
if not chan.connected():
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'This channel is already closed, skipping cancel request..'
|
'Peer {peer_id} is already disconnected\n'
|
||||||
|
'-> skipping cancel request..\n'
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
reminfo: str = (
|
|
||||||
f'c)=> {self.channel.aid}\n'
|
|
||||||
f' |_{chan}\n'
|
|
||||||
)
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Requesting actor-runtime cancel for peer\n\n'
|
f'Sending actor-runtime-cancel-req to peer\n'
|
||||||
f'{reminfo}'
|
f'\n'
|
||||||
|
f'c)=> {peer_id}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX the one spot we set it?
|
# XXX the one spot we set it?
|
||||||
|
@ -300,8 +321,9 @@ class Portal:
|
||||||
# may timeout and we never get an ack (obvi racy)
|
# may timeout and we never get an ack (obvi racy)
|
||||||
# but that doesn't mean it wasn't cancelled.
|
# but that doesn't mean it wasn't cancelled.
|
||||||
log.debug(
|
log.debug(
|
||||||
'May have failed to cancel peer?\n'
|
f'May have failed to cancel peer?\n'
|
||||||
f'{reminfo}'
|
f'\n'
|
||||||
|
f'c)=?> {peer_id}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# if we get here some weird cancellation case happened
|
# if we get here some weird cancellation case happened
|
||||||
|
@ -319,22 +341,22 @@ class Portal:
|
||||||
|
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
) as tpt_err:
|
) as tpt_err:
|
||||||
report: str = (
|
ipc_borked_report: str = (
|
||||||
f'IPC chan for actor already closed or broken?\n\n'
|
f'IPC for actor already closed/broken?\n\n'
|
||||||
f'{self.channel.aid}\n'
|
f'\n'
|
||||||
f' |_{self.channel}\n'
|
f'c)=x> {peer_id}\n'
|
||||||
)
|
)
|
||||||
match tpt_err:
|
match tpt_err:
|
||||||
case TransportClosed():
|
case TransportClosed():
|
||||||
log.debug(report)
|
log.debug(ipc_borked_report)
|
||||||
case _:
|
case _:
|
||||||
report += (
|
ipc_borked_report += (
|
||||||
f'\n'
|
f'\n'
|
||||||
f'Unhandled low-level transport-closed/error during\n'
|
f'Unhandled low-level transport-closed/error during\n'
|
||||||
f'Portal.cancel_actor()` request?\n'
|
f'Portal.cancel_actor()` request?\n'
|
||||||
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
|
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
|
||||||
)
|
)
|
||||||
log.warning(report)
|
log.warning(ipc_borked_report)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -491,10 +513,13 @@ class Portal:
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError as cre:
|
||||||
# if the far end terminates before we send a cancel the
|
# if the far end terminates before we send a cancel the
|
||||||
# underlying transport-channel may already be closed.
|
# underlying transport-channel may already be closed.
|
||||||
log.cancel(f'Context {ctx} was already closed?')
|
log.cancel(
|
||||||
|
f'Context.cancel() -> {cre!r}\n'
|
||||||
|
f'cid: {ctx.cid!r} already closed?\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: should this always be done?
|
# XXX: should this always be done?
|
||||||
# await recv_chan.aclose()
|
# await recv_chan.aclose()
|
||||||
|
|
|
@ -34,9 +34,9 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio import TaskStatus
|
||||||
|
|
||||||
from .devx.debug import (
|
from .devx import (
|
||||||
maybe_wait_for_debugger,
|
debug,
|
||||||
acquire_debug_lock,
|
pformat as _pformat
|
||||||
)
|
)
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
|
@ -51,14 +51,17 @@ from tractor._portal import Portal
|
||||||
from tractor._runtime import Actor
|
from tractor._runtime import Actor
|
||||||
from tractor._entry import _mp_main
|
from tractor._entry import _mp_main
|
||||||
from tractor._exceptions import ActorFailure
|
from tractor._exceptions import ActorFailure
|
||||||
from tractor.msg.types import (
|
from tractor.msg import (
|
||||||
Aid,
|
types as msgtypes,
|
||||||
SpawnSpec,
|
pretty_struct,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ipc import IPCServer
|
from ipc import (
|
||||||
|
_server,
|
||||||
|
Channel,
|
||||||
|
)
|
||||||
from ._supervise import ActorNursery
|
from ._supervise import ActorNursery
|
||||||
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
||||||
|
|
||||||
|
@ -328,20 +331,21 @@ async def soft_kill(
|
||||||
see `.hard_kill()`).
|
see `.hard_kill()`).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
peer_aid: Aid = portal.channel.aid
|
chan: Channel = portal.channel
|
||||||
|
peer_aid: msgtypes.Aid = chan.aid
|
||||||
try:
|
try:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Soft killing sub-actor via portal request\n'
|
f'Soft killing sub-actor via portal request\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'(c=> {peer_aid}\n'
|
f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
|
||||||
f' |_{proc}\n'
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
# wait on sub-proc to signal termination
|
# wait on sub-proc to signal termination
|
||||||
await wait_func(proc)
|
await wait_func(proc)
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await maybe_wait_for_debugger(
|
await debug.maybe_wait_for_debugger(
|
||||||
child_in_debug=_runtime_vars.get(
|
child_in_debug=_runtime_vars.get(
|
||||||
'_debug_mode', False
|
'_debug_mode', False
|
||||||
),
|
),
|
||||||
|
@ -465,7 +469,7 @@ async def trio_proc(
|
||||||
"--uid",
|
"--uid",
|
||||||
# TODO, how to pass this over "wire" encodings like
|
# TODO, how to pass this over "wire" encodings like
|
||||||
# cmdline args?
|
# cmdline args?
|
||||||
# -[ ] maybe we can add an `Aid.min_tuple()` ?
|
# -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
|
||||||
str(subactor.uid),
|
str(subactor.uid),
|
||||||
# Address the child must connect to on startup
|
# Address the child must connect to on startup
|
||||||
"--parent_addr",
|
"--parent_addr",
|
||||||
|
@ -483,13 +487,14 @@ async def trio_proc(
|
||||||
|
|
||||||
cancelled_during_spawn: bool = False
|
cancelled_during_spawn: bool = False
|
||||||
proc: trio.Process|None = None
|
proc: trio.Process|None = None
|
||||||
ipc_server: IPCServer = actor_nursery._actor.ipc_server
|
ipc_server: _server.Server = actor_nursery._actor.ipc_server
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
|
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Started new child\n'
|
f'Started new child subproc\n'
|
||||||
f'|_{proc}\n'
|
f'(>\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
|
@ -507,10 +512,10 @@ async def trio_proc(
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# don't clobber an ongoing pdb
|
# don't clobber an ongoing pdb
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
await maybe_wait_for_debugger()
|
await debug.maybe_wait_for_debugger()
|
||||||
|
|
||||||
elif proc is not None:
|
elif proc is not None:
|
||||||
async with acquire_debug_lock(subactor.uid):
|
async with debug.acquire_debug_lock(subactor.uid):
|
||||||
# soft wait on the proc to terminate
|
# soft wait on the proc to terminate
|
||||||
with trio.move_on_after(0.5):
|
with trio.move_on_after(0.5):
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
|
@ -528,14 +533,19 @@ async def trio_proc(
|
||||||
|
|
||||||
# send a "spawning specification" which configures the
|
# send a "spawning specification" which configures the
|
||||||
# initial runtime state of the child.
|
# initial runtime state of the child.
|
||||||
sspec = SpawnSpec(
|
sspec = msgtypes.SpawnSpec(
|
||||||
_parent_main_data=subactor._parent_main_data,
|
_parent_main_data=subactor._parent_main_data,
|
||||||
enable_modules=subactor.enable_modules,
|
enable_modules=subactor.enable_modules,
|
||||||
reg_addrs=subactor.reg_addrs,
|
reg_addrs=subactor.reg_addrs,
|
||||||
bind_addrs=bind_addrs,
|
bind_addrs=bind_addrs,
|
||||||
_runtime_vars=_runtime_vars,
|
_runtime_vars=_runtime_vars,
|
||||||
)
|
)
|
||||||
log.runtime(f'Sending spawn spec: {str(sspec)}')
|
log.runtime(
|
||||||
|
f'Sending spawn spec to child\n'
|
||||||
|
f'{{}}=> {chan.aid.reprol()!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{pretty_struct.pformat(sspec)}\n'
|
||||||
|
)
|
||||||
await chan.send(sspec)
|
await chan.send(sspec)
|
||||||
|
|
||||||
# track subactor in current nursery
|
# track subactor in current nursery
|
||||||
|
@ -563,7 +573,7 @@ async def trio_proc(
|
||||||
# condition.
|
# condition.
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
trio.Process.wait,
|
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
||||||
portal
|
portal
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -571,8 +581,7 @@ async def trio_proc(
|
||||||
# tandem if not done already
|
# tandem if not done already
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancelling portal result reaper task\n'
|
'Cancelling portal result reaper task\n'
|
||||||
f'>c)\n'
|
f'c)> {subactor.aid.reprol()!r}\n'
|
||||||
f' |_{subactor.uid}\n'
|
|
||||||
)
|
)
|
||||||
nursery.cancel_scope.cancel()
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
@ -581,21 +590,24 @@ async def trio_proc(
|
||||||
# allowed! Do this **after** cancellation/teardown to avoid
|
# allowed! Do this **after** cancellation/teardown to avoid
|
||||||
# killing the process too early.
|
# killing the process too early.
|
||||||
if proc:
|
if proc:
|
||||||
|
reap_repr: str = _pformat.nest_from_op(
|
||||||
|
input_op='>x)',
|
||||||
|
text=subactor.pformat(),
|
||||||
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Hard reap sequence starting for subactor\n'
|
f'Hard reap sequence starting for subactor\n'
|
||||||
f'>x)\n'
|
f'{reap_repr}'
|
||||||
f' |_{subactor}@{subactor.uid}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# don't clobber an ongoing pdb
|
# don't clobber an ongoing pdb
|
||||||
if cancelled_during_spawn:
|
if cancelled_during_spawn:
|
||||||
# Try again to avoid TTY clobbering.
|
# Try again to avoid TTY clobbering.
|
||||||
async with acquire_debug_lock(subactor.uid):
|
async with debug.acquire_debug_lock(subactor.uid):
|
||||||
with trio.move_on_after(0.5):
|
with trio.move_on_after(0.5):
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
|
|
||||||
await maybe_wait_for_debugger(
|
await debug.maybe_wait_for_debugger(
|
||||||
child_in_debug=_runtime_vars.get(
|
child_in_debug=_runtime_vars.get(
|
||||||
'_debug_mode', False
|
'_debug_mode', False
|
||||||
),
|
),
|
||||||
|
@ -624,7 +636,7 @@ async def trio_proc(
|
||||||
# acquire the lock and get notified of who has it,
|
# acquire the lock and get notified of who has it,
|
||||||
# check that uid against our known children?
|
# check that uid against our known children?
|
||||||
# this_uid: tuple[str, str] = current_actor().uid
|
# this_uid: tuple[str, str] = current_actor().uid
|
||||||
# await acquire_debug_lock(this_uid)
|
# await debug.acquire_debug_lock(this_uid)
|
||||||
|
|
||||||
if proc.poll() is None:
|
if proc.poll() is None:
|
||||||
log.cancel(f"Attempting to hard kill {proc}")
|
log.cancel(f"Attempting to hard kill {proc}")
|
||||||
|
@ -727,7 +739,7 @@ async def mp_proc(
|
||||||
|
|
||||||
log.runtime(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
ipc_server: IPCServer = actor_nursery._actor.ipc_server
|
ipc_server: _server.Server = actor_nursery._actor.ipc_server
|
||||||
try:
|
try:
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
|
|
Loading…
Reference in New Issue