Compare commits
10 Commits
2800100b21
...
7dd72e042d
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 7dd72e042d | |
Tyler Goodlet | ee8ead4d7a | |
Tyler Goodlet | 8a70a52ff9 | |
Tyler Goodlet | 70e4458fb0 | |
Tyler Goodlet | 70d1c98c10 | |
Tyler Goodlet | dade6a4b43 | |
Tyler Goodlet | 7b4049198a | |
Tyler Goodlet | 931b20cf35 | |
Tyler Goodlet | 11c1582c39 | |
Tyler Goodlet | d1f347c21f |
|
@ -111,4 +111,4 @@ jobs:
|
|||
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||
|
||||
- name: Run tests
|
||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace
|
||||
|
|
|
@ -3,13 +3,14 @@
|
|||
|gh_actions|
|
||||
|docs|
|
||||
|
||||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_.
|
||||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
||||
built on trio_.
|
||||
|
||||
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
|
||||
our nurseries_ let you spawn new Python processes which each run a ``trio``
|
||||
scheduled runtime - a call to ``trio.run()``.
|
||||
|
||||
We believe the system adhere's to the `3 axioms`_ of an "`actor model`_"
|
||||
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
||||
but likely *does not* look like what *you* probably think an "actor
|
||||
model" looks like, and that's *intentional*.
|
||||
|
||||
|
@ -577,13 +578,13 @@ say hi, please feel free to reach us in our `matrix channel`_. If
|
|||
matrix seems too hip, we're also mostly all in the the `trio gitter
|
||||
channel`_!
|
||||
|
||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
||||
.. _trio: https://github.com/python-trio/trio
|
||||
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
|
||||
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
|
||||
.. _trio: https://github.com/python-trio/trio
|
||||
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
||||
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
|
||||
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
|
||||
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
|
||||
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
||||
|
|
|
@ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
|
|||
'example_script',
|
||||
|
||||
# walk yields: (dirpath, dirnames, filenames)
|
||||
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
|
||||
[
|
||||
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
|
||||
|
||||
if '__' not in f
|
||||
and f[0] != '_'
|
||||
and 'debugging' not in p[0]],
|
||||
and 'debugging' not in p[0]
|
||||
and 'integration' not in p[0]
|
||||
],
|
||||
|
||||
ids=lambda t: t[1],
|
||||
)
|
||||
|
|
|
@ -611,7 +611,8 @@ class Actor:
|
|||
entry = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
_, proc, _ = entry
|
||||
log.warning(f'Actor {uid}@{proc} IPC connection broke!?')
|
||||
log.warning(
|
||||
f'Actor {uid}@{proc} IPC connection broke!?')
|
||||
# if proc.poll() is not None:
|
||||
# log.error('Actor {uid} proc died and IPC broke?')
|
||||
|
||||
|
@ -630,6 +631,11 @@ class Actor:
|
|||
# Attempt to wait for the far end to close the channel
|
||||
# and bail after timeout (2-generals on closure).
|
||||
assert chan.msgstream
|
||||
|
||||
log.runtime(
|
||||
f'Draining lingering msgs from stream {chan.msgstream}'
|
||||
)
|
||||
|
||||
async for msg in chan.msgstream.drain():
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
|
|
|
@ -41,6 +41,7 @@ from .log import get_logger
|
|||
from ._discovery import get_root
|
||||
from ._state import is_root_process, debug_mode
|
||||
from ._exceptions import is_multi_cancelled
|
||||
from ._ipc import Channel
|
||||
|
||||
|
||||
try:
|
||||
|
@ -178,12 +179,6 @@ async def _acquire_debug_lock(
|
|||
|
||||
we_acquired = False
|
||||
|
||||
if _no_remote_has_tty is None:
|
||||
# mark the tty lock as being in use so that the runtime
|
||||
# can try to avoid clobbering any connection from a child
|
||||
# that's currently relying on it.
|
||||
_no_remote_has_tty = trio.Event()
|
||||
|
||||
try:
|
||||
log.runtime(
|
||||
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||
|
@ -191,6 +186,12 @@ async def _acquire_debug_lock(
|
|||
we_acquired = True
|
||||
await _debug_lock.acquire()
|
||||
|
||||
if _no_remote_has_tty is None:
|
||||
# mark the tty lock as being in use so that the runtime
|
||||
# can try to avoid clobbering any connection from a child
|
||||
# that's currently relying on it.
|
||||
_no_remote_has_tty = trio.Event()
|
||||
|
||||
_global_actor_in_debug = uid
|
||||
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||
|
||||
|
@ -208,6 +209,7 @@ async def _acquire_debug_lock(
|
|||
|
||||
finally:
|
||||
# if _global_actor_in_debug == uid:
|
||||
|
||||
if (
|
||||
we_acquired
|
||||
and _debug_lock.locked()
|
||||
|
@ -224,12 +226,15 @@ async def _acquire_debug_lock(
|
|||
not stats.owner
|
||||
):
|
||||
log.runtime(f"No more tasks waiting on tty lock! says {uid}")
|
||||
if _no_remote_has_tty is not None:
|
||||
_no_remote_has_tty.set()
|
||||
_no_remote_has_tty = None
|
||||
|
||||
_global_actor_in_debug = None
|
||||
|
||||
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||
log.runtime(
|
||||
f"TTY lock released, remote task: {task_name}:{uid}"
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -244,6 +249,10 @@ async def _hijack_stdin_for_child(
|
|||
the pdbpp debugger console can be allocated to a sub-actor for repl
|
||||
bossing.
|
||||
|
||||
NOTE: this task is invoked in the root actor-process of the actor
|
||||
tree. It is meant to be invoked as an rpc-task which should be
|
||||
highly reliable at cleaning out the tty-lock state when complete!
|
||||
|
||||
'''
|
||||
task_name = trio.lowlevel.current_task().name
|
||||
|
||||
|
@ -265,9 +274,9 @@ async def _hijack_stdin_for_child(
|
|||
with (
|
||||
trio.CancelScope(shield=True),
|
||||
):
|
||||
try:
|
||||
lock = None
|
||||
async with _acquire_debug_lock(subactor_uid) as lock:
|
||||
# try:
|
||||
# lock = None
|
||||
async with _acquire_debug_lock(subactor_uid): # as lock:
|
||||
|
||||
# indicate to child that we've locked stdio
|
||||
await ctx.started('Locked')
|
||||
|
@ -279,32 +288,35 @@ async def _hijack_stdin_for_child(
|
|||
async with ctx.open_stream() as stream:
|
||||
assert await stream.receive() == 'pdb_unlock'
|
||||
|
||||
except (
|
||||
BaseException,
|
||||
# trio.MultiError,
|
||||
# Exception,
|
||||
# trio.BrokenResourceError,
|
||||
# trio.Cancelled, # by local cancellation
|
||||
# trio.ClosedResourceError, # by self._rx_chan
|
||||
# ContextCancelled,
|
||||
# ConnectionResetError,
|
||||
):
|
||||
# XXX: there may be a race with the portal teardown
|
||||
# with the calling actor which we can safely ignore.
|
||||
# The alternative would be sending an ack message
|
||||
# and allowing the client to wait for us to teardown
|
||||
# first?
|
||||
if lock and lock.locked():
|
||||
lock.release()
|
||||
# except (
|
||||
# BaseException,
|
||||
# # trio.MultiError,
|
||||
# # Exception,
|
||||
# # trio.BrokenResourceError,
|
||||
# # trio.Cancelled, # by local cancellation
|
||||
# # trio.ClosedResourceError, # by self._rx_chan
|
||||
# # ContextCancelled,
|
||||
# # ConnectionResetError,
|
||||
# ):
|
||||
# # XXX: there may be a race with the portal teardown
|
||||
# # with the calling actor which we can safely ignore.
|
||||
# # The alternative would be sending an ack message
|
||||
# # and allowing the client to wait for us to teardown
|
||||
# # first?
|
||||
# if lock and lock.locked():
|
||||
# try:
|
||||
# lock.release()
|
||||
# except RuntimeError:
|
||||
# log.exception(f"we don't own the tty lock?")
|
||||
|
||||
# if isinstance(err, trio.Cancelled):
|
||||
raise
|
||||
# # if isinstance(err, trio.Cancelled):
|
||||
# raise
|
||||
|
||||
finally:
|
||||
log.runtime(
|
||||
"TTY lock released, remote task:"
|
||||
f"{task_name}:{subactor_uid}"
|
||||
)
|
||||
# finally:
|
||||
# log.runtime(
|
||||
# "TTY lock released, remote task:"
|
||||
# f"{task_name}:{subactor_uid}"
|
||||
# )
|
||||
|
||||
return "pdb_unlock_complete"
|
||||
|
||||
|
@ -385,6 +397,10 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
|||
signal.SIGINT,
|
||||
partial(shield_sigint, pdb_obj=pdb),
|
||||
)
|
||||
|
||||
# XXX: These are the important flags mentioned in
|
||||
# https://github.com/python-trio/trio/issues/1155
|
||||
# which resolve the traceback spews to console.
|
||||
pdb.allow_kbdint = True
|
||||
pdb.nosigint = True
|
||||
|
||||
|
@ -453,11 +469,15 @@ async def _breakpoint(
|
|||
_local_task_in_debug = task_name
|
||||
|
||||
def child_release_hook():
|
||||
# _local_task_in_debug = None
|
||||
try:
|
||||
# sometimes the ``trio`` might already be termianated in
|
||||
# which case this call will raise.
|
||||
_local_pdb_complete.set()
|
||||
|
||||
finally:
|
||||
# restore original sigint handler
|
||||
undo_sigint()
|
||||
# should always be cleared in the hijack hook aboved right?
|
||||
# _local_task_in_debug = None
|
||||
|
||||
# assign unlock callback for debugger teardown hooks
|
||||
# _pdb_release_hook = _local_pdb_complete.set
|
||||
|
@ -471,11 +491,15 @@ async def _breakpoint(
|
|||
# we have to figure out how to avoid having the service nursery
|
||||
# cancel on this task start? I *think* this works below?
|
||||
# actor._service_n.cancel_scope.shield = shield
|
||||
try:
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor._service_n.start(
|
||||
wait_for_parent_stdin_hijack,
|
||||
actor.uid,
|
||||
)
|
||||
except RuntimeError:
|
||||
child_release_hook()
|
||||
raise
|
||||
|
||||
elif is_root_process():
|
||||
|
||||
|
@ -524,8 +548,12 @@ async def _breakpoint(
|
|||
|
||||
_global_actor_in_debug = None
|
||||
_local_task_in_debug = None
|
||||
_local_pdb_complete.set()
|
||||
|
||||
try:
|
||||
# sometimes the ``trio`` might already be termianated in
|
||||
# which case this call will raise.
|
||||
_local_pdb_complete.set()
|
||||
finally:
|
||||
# restore original sigint handler
|
||||
undo_sigint()
|
||||
|
||||
|
@ -581,20 +609,58 @@ def shield_sigint(
|
|||
__tracebackhide__ = True
|
||||
|
||||
global _local_task_in_debug, _global_actor_in_debug
|
||||
in_debug = _global_actor_in_debug
|
||||
uid_in_debug = _global_actor_in_debug
|
||||
|
||||
actor = tractor.current_actor()
|
||||
|
||||
def do_cancel():
|
||||
# If we haven't tried to cancel the runtime then do that instead
|
||||
# of raising a KBI (which may non-gracefully destroy
|
||||
# a ``trio.run()``).
|
||||
if not actor._cancel_called:
|
||||
actor.cancel_soon()
|
||||
|
||||
# If the runtime is already cancelled it likely means the user
|
||||
# hit ctrl-c again because teardown didn't full take place in
|
||||
# which case we do the "hard" raising of a local KBI.
|
||||
else:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
any_connected = False
|
||||
|
||||
if uid_in_debug is not None:
|
||||
# try to see if the supposed (sub)actor in debug still
|
||||
# has an active connection to *this* actor, and if not
|
||||
# it's likely they aren't using the TTY lock / debugger
|
||||
# and we should propagate SIGINT normally.
|
||||
chans = actor._peers.get(tuple(uid_in_debug))
|
||||
if chans:
|
||||
any_connected = any(chan.connected() for chan in chans)
|
||||
if not any_connected:
|
||||
log.warning(
|
||||
'A global actor reported to be in debug '
|
||||
'but no connection exists for this child:\n'
|
||||
f'{uid_in_debug}\n'
|
||||
'Allowing SIGINT propagation..'
|
||||
)
|
||||
return do_cancel()
|
||||
|
||||
# root actor branch that reports whether or not a child
|
||||
# has locked debugger.
|
||||
if (
|
||||
is_root_process()
|
||||
and in_debug
|
||||
and uid_in_debug is not None
|
||||
|
||||
# XXX: only if there is an existing connection to the
|
||||
# (sub-)actor in debug do we ignore SIGINT in this
|
||||
# parent! Otherwise we may hang waiting for an actor
|
||||
# which has already terminated to unlock.
|
||||
and any_connected
|
||||
):
|
||||
name = in_debug[0]
|
||||
name = uid_in_debug[0]
|
||||
if name != 'root':
|
||||
log.pdb(
|
||||
f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
|
||||
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
|
||||
)
|
||||
|
||||
else:
|
||||
|
@ -606,6 +672,16 @@ def shield_sigint(
|
|||
elif (
|
||||
not is_root_process()
|
||||
):
|
||||
chan: Channel = actor._parent_chan
|
||||
if not chan or not chan.connected():
|
||||
log.warning(
|
||||
'A global actor reported to be in debug '
|
||||
'but no connection exists for its parent:\n'
|
||||
f'{uid_in_debug}\n'
|
||||
'Allowing SIGINT propagation..'
|
||||
)
|
||||
return do_cancel()
|
||||
|
||||
task = _local_task_in_debug
|
||||
if task:
|
||||
log.pdb(
|
||||
|
@ -621,20 +697,6 @@ def shield_sigint(
|
|||
"Ignoring SIGINT since debug mode is enabled"
|
||||
)
|
||||
|
||||
# noone has the debugger so raise KBI
|
||||
else:
|
||||
# If we haven't tried to cancel the runtime then do that instead
|
||||
# of raising a KBI (which may non-gracefully destroy
|
||||
# a ``trio.run()``).
|
||||
if not actor._cancel_called:
|
||||
actor.cancel_soon()
|
||||
|
||||
# If the runtime is already cancelled it likely means the user
|
||||
# hit ctrl-c again because teardown didn't full take place in
|
||||
# which case we do the "hard" raising of a local KBI.
|
||||
else:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
# maybe redraw/print last REPL output to console
|
||||
if pdb_obj:
|
||||
|
||||
|
@ -648,8 +710,12 @@ def shield_sigint(
|
|||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||
|
||||
try:
|
||||
pdb_obj.do_longlist(None)
|
||||
print(pdb_obj.prompt, end='', flush=True)
|
||||
except AttributeError:
|
||||
log.exception('pdbpp longlist failed...')
|
||||
raise KeyboardInterrupt
|
||||
|
||||
|
||||
def _set_trace(
|
||||
|
@ -667,7 +733,7 @@ def _set_trace(
|
|||
# start 2 levels up in user code
|
||||
frame: FrameType = sys._getframe()
|
||||
if frame:
|
||||
frame = frame.f_back.f_back # type: ignore
|
||||
frame = frame.f_back # type: ignore
|
||||
|
||||
if pdb and actor is not None:
|
||||
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
|
||||
|
|
|
@ -511,8 +511,8 @@ class Portal:
|
|||
if ctx.chan.connected():
|
||||
log.info(
|
||||
'Waiting on final context-task result for\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
f'task: {cid}\n'
|
||||
f'actor: {uid}'
|
||||
)
|
||||
result = await ctx.result()
|
||||
|
||||
|
|
|
@ -601,7 +601,14 @@ class Context:
|
|||
|
||||
finally:
|
||||
if self._portal:
|
||||
self._portal._streams.remove(rchan)
|
||||
try:
|
||||
self._portal._streams.remove(stream)
|
||||
except KeyError:
|
||||
log.warning(
|
||||
f'Stream was already destroyed?\n'
|
||||
f'actor: {self.chan.uid}\n'
|
||||
f'ctx id: {self.cid}'
|
||||
)
|
||||
|
||||
async def result(self) -> Any:
|
||||
'''
|
||||
|
|
|
@ -271,7 +271,12 @@ def _run_asyncio_task(
|
|||
task.exception()
|
||||
except BaseException as terr:
|
||||
task_err = terr
|
||||
|
||||
if isinstance(terr, CancelledError):
|
||||
log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
|
||||
else:
|
||||
log.exception(f'`asyncio` task: {task.get_name()} errored')
|
||||
|
||||
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
|
||||
|
||||
if aio_err is not None:
|
||||
|
|
Loading…
Reference in New Issue