Compare commits

..

No commits in common. "7dd72e042d8f7811a22a264457c6b0accfefa863" and "2800100b2148f7139edcc80afb501edca1f48dd3" have entirely different histories.

9 changed files with 94 additions and 182 deletions

View File

@ -111,4 +111,4 @@ jobs:
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs

View File

@ -3,14 +3,13 @@
|gh_actions|
|docs|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
We believe the system adhere's to the `3 axioms`_ of an "`actor model`_"
but likely *does not* look like what *you* probably think an "actor
model" looks like, and that's *intentional*.
@ -578,13 +577,13 @@ say hi, please feel free to reach us in our `matrix channel`_. If
matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_!
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
.. _trio: https://github.com/python-trio/trio
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s

View File

@ -81,14 +81,11 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
'example_script',
# walk yields: (dirpath, dirnames, filenames)
[
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f
and f[0] != '_'
and 'debugging' not in p[0]
and 'integration' not in p[0]
],
and 'debugging' not in p[0]],
ids=lambda t: t[1],
)

View File

@ -611,8 +611,7 @@ class Actor:
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
log.warning(
f'Actor {uid}@{proc} IPC connection broke!?')
log.warning(f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?')
@ -631,11 +630,6 @@ class Actor:
# Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure).
assert chan.msgstream
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs
# before we destroy the channel.

View File

@ -41,7 +41,6 @@ from .log import get_logger
from ._discovery import get_root
from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled
from ._ipc import Channel
try:
@ -179,6 +178,12 @@ async def _acquire_debug_lock(
we_acquired = False
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
try:
log.runtime(
f"entering lock checkpoint, remote task: {task_name}:{uid}"
@ -186,12 +191,6 @@ async def _acquire_debug_lock(
we_acquired = True
await _debug_lock.acquire()
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
_global_actor_in_debug = uid
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
@ -209,7 +208,6 @@ async def _acquire_debug_lock(
finally:
# if _global_actor_in_debug == uid:
if (
we_acquired
and _debug_lock.locked()
@ -226,15 +224,12 @@ async def _acquire_debug_lock(
not stats.owner
):
log.runtime(f"No more tasks waiting on tty lock! says {uid}")
if _no_remote_has_tty is not None:
_no_remote_has_tty.set()
_no_remote_has_tty = None
_global_actor_in_debug = None
log.runtime(
f"TTY lock released, remote task: {task_name}:{uid}"
)
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}")
@tractor.context
@ -249,10 +244,6 @@ async def _hijack_stdin_for_child(
the pdbpp debugger console can be allocated to a sub-actor for repl
bossing.
NOTE: this task is invoked in the root actor-process of the actor
tree. It is meant to be invoked as an rpc-task which should be
highly reliable at cleaning out the tty-lock state when complete!
'''
task_name = trio.lowlevel.current_task().name
@ -274,9 +265,9 @@ async def _hijack_stdin_for_child(
with (
trio.CancelScope(shield=True),
):
# try:
# lock = None
async with _acquire_debug_lock(subactor_uid): # as lock:
try:
lock = None
async with _acquire_debug_lock(subactor_uid) as lock:
# indicate to child that we've locked stdio
await ctx.started('Locked')
@ -288,35 +279,32 @@ async def _hijack_stdin_for_child(
async with ctx.open_stream() as stream:
assert await stream.receive() == 'pdb_unlock'
# except (
# BaseException,
# # trio.MultiError,
# # Exception,
# # trio.BrokenResourceError,
# # trio.Cancelled, # by local cancellation
# # trio.ClosedResourceError, # by self._rx_chan
# # ContextCancelled,
# # ConnectionResetError,
# ):
# # XXX: there may be a race with the portal teardown
# # with the calling actor which we can safely ignore.
# # The alternative would be sending an ack message
# # and allowing the client to wait for us to teardown
# # first?
# if lock and lock.locked():
# try:
# lock.release()
# except RuntimeError:
# log.exception(f"we don't own the tty lock?")
except (
BaseException,
# trio.MultiError,
# Exception,
# trio.BrokenResourceError,
# trio.Cancelled, # by local cancellation
# trio.ClosedResourceError, # by self._rx_chan
# ContextCancelled,
# ConnectionResetError,
):
# XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore.
# The alternative would be sending an ack message
# and allowing the client to wait for us to teardown
# first?
if lock and lock.locked():
lock.release()
# # if isinstance(err, trio.Cancelled):
# raise
# if isinstance(err, trio.Cancelled):
raise
# finally:
# log.runtime(
# "TTY lock released, remote task:"
# f"{task_name}:{subactor_uid}"
# )
finally:
log.runtime(
"TTY lock released, remote task:"
f"{task_name}:{subactor_uid}"
)
return "pdb_unlock_complete"
@ -397,10 +385,6 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
signal.SIGINT,
partial(shield_sigint, pdb_obj=pdb),
)
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
# which resolve the traceback spews to console.
pdb.allow_kbdint = True
pdb.nosigint = True
@ -469,15 +453,11 @@ async def _breakpoint(
_local_task_in_debug = task_name
def child_release_hook():
try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
# _local_task_in_debug = None
_local_pdb_complete.set()
finally:
# restore original sigint handler
undo_sigint()
# should always be cleared in the hijack hook aboved right?
# _local_task_in_debug = None
# assign unlock callback for debugger teardown hooks
# _pdb_release_hook = _local_pdb_complete.set
@ -491,15 +471,11 @@ async def _breakpoint(
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield
try:
with trio.CancelScope(shield=True):
await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
)
except RuntimeError:
child_release_hook()
raise
elif is_root_process():
@ -548,12 +524,8 @@ async def _breakpoint(
_global_actor_in_debug = None
_local_task_in_debug = None
try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set()
finally:
# restore original sigint handler
undo_sigint()
@ -609,58 +581,20 @@ def shield_sigint(
__tracebackhide__ = True
global _local_task_in_debug, _global_actor_in_debug
uid_in_debug = _global_actor_in_debug
in_debug = _global_actor_in_debug
actor = tractor.current_actor()
def do_cancel():
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
any_connected = False
if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
chans = actor._peers.get(tuple(uid_in_debug))
if chans:
any_connected = any(chan.connected() for chan in chans)
if not any_connected:
log.warning(
'A global actor reported to be in debug '
'but no connection exists for this child:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
# root actor branch that reports whether or not a child
# has locked debugger.
if (
is_root_process()
and uid_in_debug is not None
# XXX: only if there is an existing connection to the
# (sub-)actor in debug do we ignore SIGINT in this
# parent! Otherwise we may hang waiting for an actor
# which has already terminated to unlock.
and any_connected
and in_debug
):
name = uid_in_debug[0]
name = in_debug[0]
if name != 'root':
log.pdb(
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
)
else:
@ -672,16 +606,6 @@ def shield_sigint(
elif (
not is_root_process()
):
chan: Channel = actor._parent_chan
if not chan or not chan.connected():
log.warning(
'A global actor reported to be in debug '
'but no connection exists for its parent:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
task = _local_task_in_debug
if task:
log.pdb(
@ -697,6 +621,20 @@ def shield_sigint(
"Ignoring SIGINT since debug mode is enabled"
)
# noone has the debugger so raise KBI
else:
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
# maybe redraw/print last REPL output to console
if pdb_obj:
@ -710,12 +648,8 @@ def shield_sigint(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
try:
pdb_obj.do_longlist(None)
print(pdb_obj.prompt, end='', flush=True)
except AttributeError:
log.exception('pdbpp longlist failed...')
raise KeyboardInterrupt
def _set_trace(
@ -733,7 +667,7 @@ def _set_trace(
# start 2 levels up in user code
frame: FrameType = sys._getframe()
if frame:
frame = frame.f_back # type: ignore
frame = frame.f_back.f_back # type: ignore
if pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")

View File

@ -601,14 +601,7 @@ class Context:
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
self._portal._streams.remove(rchan)
async def result(self) -> Any:
'''

View File

@ -271,12 +271,7 @@ def _run_asyncio_task(
task.exception()
except BaseException as terr:
task_err = terr
if isinstance(terr, CancelledError):
log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
else:
log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None: