Compare commits
10 Commits
2800100b21
...
7dd72e042d
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 7dd72e042d | |
Tyler Goodlet | ee8ead4d7a | |
Tyler Goodlet | 8a70a52ff9 | |
Tyler Goodlet | 70e4458fb0 | |
Tyler Goodlet | 70d1c98c10 | |
Tyler Goodlet | dade6a4b43 | |
Tyler Goodlet | 7b4049198a | |
Tyler Goodlet | 931b20cf35 | |
Tyler Goodlet | 11c1582c39 | |
Tyler Goodlet | d1f347c21f |
|
@ -111,4 +111,4 @@ jobs:
|
||||||
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace
|
||||||
|
|
|
@ -3,13 +3,14 @@
|
||||||
|gh_actions|
|
|gh_actions|
|
||||||
|docs|
|
|docs|
|
||||||
|
|
||||||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_.
|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
||||||
|
built on trio_.
|
||||||
|
|
||||||
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
|
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
|
||||||
our nurseries_ let you spawn new Python processes which each run a ``trio``
|
our nurseries_ let you spawn new Python processes which each run a ``trio``
|
||||||
scheduled runtime - a call to ``trio.run()``.
|
scheduled runtime - a call to ``trio.run()``.
|
||||||
|
|
||||||
We believe the system adhere's to the `3 axioms`_ of an "`actor model`_"
|
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
||||||
but likely *does not* look like what *you* probably think an "actor
|
but likely *does not* look like what *you* probably think an "actor
|
||||||
model" looks like, and that's *intentional*.
|
model" looks like, and that's *intentional*.
|
||||||
|
|
||||||
|
@ -577,13 +578,13 @@ say hi, please feel free to reach us in our `matrix channel`_. If
|
||||||
matrix seems too hip, we're also mostly all in the the `trio gitter
|
matrix seems too hip, we're also mostly all in the the `trio gitter
|
||||||
channel`_!
|
channel`_!
|
||||||
|
|
||||||
|
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
||||||
|
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
||||||
|
.. _trio: https://github.com/python-trio/trio
|
||||||
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
|
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
|
||||||
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
|
.. _actor model: https://en.wikipedia.org/wiki/Actor_model
|
||||||
.. _trio: https://github.com/python-trio/trio
|
|
||||||
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
|
|
||||||
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
|
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
|
||||||
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
|
||||||
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
|
|
||||||
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
|
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
|
||||||
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
|
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
|
||||||
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
||||||
|
|
|
@ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
|
||||||
'example_script',
|
'example_script',
|
||||||
|
|
||||||
# walk yields: (dirpath, dirnames, filenames)
|
# walk yields: (dirpath, dirnames, filenames)
|
||||||
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
|
[
|
||||||
|
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
|
||||||
|
|
||||||
if '__' not in f
|
if '__' not in f
|
||||||
and f[0] != '_'
|
and f[0] != '_'
|
||||||
and 'debugging' not in p[0]],
|
and 'debugging' not in p[0]
|
||||||
|
and 'integration' not in p[0]
|
||||||
|
],
|
||||||
|
|
||||||
ids=lambda t: t[1],
|
ids=lambda t: t[1],
|
||||||
)
|
)
|
||||||
|
|
|
@ -611,7 +611,8 @@ class Actor:
|
||||||
entry = local_nursery._children.get(uid)
|
entry = local_nursery._children.get(uid)
|
||||||
if entry:
|
if entry:
|
||||||
_, proc, _ = entry
|
_, proc, _ = entry
|
||||||
log.warning(f'Actor {uid}@{proc} IPC connection broke!?')
|
log.warning(
|
||||||
|
f'Actor {uid}@{proc} IPC connection broke!?')
|
||||||
# if proc.poll() is not None:
|
# if proc.poll() is not None:
|
||||||
# log.error('Actor {uid} proc died and IPC broke?')
|
# log.error('Actor {uid} proc died and IPC broke?')
|
||||||
|
|
||||||
|
@ -630,6 +631,11 @@ class Actor:
|
||||||
# Attempt to wait for the far end to close the channel
|
# Attempt to wait for the far end to close the channel
|
||||||
# and bail after timeout (2-generals on closure).
|
# and bail after timeout (2-generals on closure).
|
||||||
assert chan.msgstream
|
assert chan.msgstream
|
||||||
|
|
||||||
|
log.runtime(
|
||||||
|
f'Draining lingering msgs from stream {chan.msgstream}'
|
||||||
|
)
|
||||||
|
|
||||||
async for msg in chan.msgstream.drain():
|
async for msg in chan.msgstream.drain():
|
||||||
# try to deliver any lingering msgs
|
# try to deliver any lingering msgs
|
||||||
# before we destroy the channel.
|
# before we destroy the channel.
|
||||||
|
|
|
@ -41,6 +41,7 @@ from .log import get_logger
|
||||||
from ._discovery import get_root
|
from ._discovery import get_root
|
||||||
from ._state import is_root_process, debug_mode
|
from ._state import is_root_process, debug_mode
|
||||||
from ._exceptions import is_multi_cancelled
|
from ._exceptions import is_multi_cancelled
|
||||||
|
from ._ipc import Channel
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -178,12 +179,6 @@ async def _acquire_debug_lock(
|
||||||
|
|
||||||
we_acquired = False
|
we_acquired = False
|
||||||
|
|
||||||
if _no_remote_has_tty is None:
|
|
||||||
# mark the tty lock as being in use so that the runtime
|
|
||||||
# can try to avoid clobbering any connection from a child
|
|
||||||
# that's currently relying on it.
|
|
||||||
_no_remote_has_tty = trio.Event()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||||
|
@ -191,6 +186,12 @@ async def _acquire_debug_lock(
|
||||||
we_acquired = True
|
we_acquired = True
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
|
if _no_remote_has_tty is None:
|
||||||
|
# mark the tty lock as being in use so that the runtime
|
||||||
|
# can try to avoid clobbering any connection from a child
|
||||||
|
# that's currently relying on it.
|
||||||
|
_no_remote_has_tty = trio.Event()
|
||||||
|
|
||||||
_global_actor_in_debug = uid
|
_global_actor_in_debug = uid
|
||||||
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
@ -208,6 +209,7 @@ async def _acquire_debug_lock(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# if _global_actor_in_debug == uid:
|
# if _global_actor_in_debug == uid:
|
||||||
|
|
||||||
if (
|
if (
|
||||||
we_acquired
|
we_acquired
|
||||||
and _debug_lock.locked()
|
and _debug_lock.locked()
|
||||||
|
@ -224,12 +226,15 @@ async def _acquire_debug_lock(
|
||||||
not stats.owner
|
not stats.owner
|
||||||
):
|
):
|
||||||
log.runtime(f"No more tasks waiting on tty lock! says {uid}")
|
log.runtime(f"No more tasks waiting on tty lock! says {uid}")
|
||||||
_no_remote_has_tty.set()
|
if _no_remote_has_tty is not None:
|
||||||
_no_remote_has_tty = None
|
_no_remote_has_tty.set()
|
||||||
|
_no_remote_has_tty = None
|
||||||
|
|
||||||
_global_actor_in_debug = None
|
_global_actor_in_debug = None
|
||||||
|
|
||||||
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}")
|
log.runtime(
|
||||||
|
f"TTY lock released, remote task: {task_name}:{uid}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -244,6 +249,10 @@ async def _hijack_stdin_for_child(
|
||||||
the pdbpp debugger console can be allocated to a sub-actor for repl
|
the pdbpp debugger console can be allocated to a sub-actor for repl
|
||||||
bossing.
|
bossing.
|
||||||
|
|
||||||
|
NOTE: this task is invoked in the root actor-process of the actor
|
||||||
|
tree. It is meant to be invoked as an rpc-task which should be
|
||||||
|
highly reliable at cleaning out the tty-lock state when complete!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
@ -265,47 +274,50 @@ async def _hijack_stdin_for_child(
|
||||||
with (
|
with (
|
||||||
trio.CancelScope(shield=True),
|
trio.CancelScope(shield=True),
|
||||||
):
|
):
|
||||||
try:
|
# try:
|
||||||
lock = None
|
# lock = None
|
||||||
async with _acquire_debug_lock(subactor_uid) as lock:
|
async with _acquire_debug_lock(subactor_uid): # as lock:
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
await ctx.started('Locked')
|
await ctx.started('Locked')
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Actor {subactor_uid} acquired stdin hijack lock"
|
f"Actor {subactor_uid} acquired stdin hijack lock"
|
||||||
)
|
|
||||||
|
|
||||||
# wait for unlock pdb by child
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
assert await stream.receive() == 'pdb_unlock'
|
|
||||||
|
|
||||||
except (
|
|
||||||
BaseException,
|
|
||||||
# trio.MultiError,
|
|
||||||
# Exception,
|
|
||||||
# trio.BrokenResourceError,
|
|
||||||
# trio.Cancelled, # by local cancellation
|
|
||||||
# trio.ClosedResourceError, # by self._rx_chan
|
|
||||||
# ContextCancelled,
|
|
||||||
# ConnectionResetError,
|
|
||||||
):
|
|
||||||
# XXX: there may be a race with the portal teardown
|
|
||||||
# with the calling actor which we can safely ignore.
|
|
||||||
# The alternative would be sending an ack message
|
|
||||||
# and allowing the client to wait for us to teardown
|
|
||||||
# first?
|
|
||||||
if lock and lock.locked():
|
|
||||||
lock.release()
|
|
||||||
|
|
||||||
# if isinstance(err, trio.Cancelled):
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
|
||||||
log.runtime(
|
|
||||||
"TTY lock released, remote task:"
|
|
||||||
f"{task_name}:{subactor_uid}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# wait for unlock pdb by child
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
assert await stream.receive() == 'pdb_unlock'
|
||||||
|
|
||||||
|
# except (
|
||||||
|
# BaseException,
|
||||||
|
# # trio.MultiError,
|
||||||
|
# # Exception,
|
||||||
|
# # trio.BrokenResourceError,
|
||||||
|
# # trio.Cancelled, # by local cancellation
|
||||||
|
# # trio.ClosedResourceError, # by self._rx_chan
|
||||||
|
# # ContextCancelled,
|
||||||
|
# # ConnectionResetError,
|
||||||
|
# ):
|
||||||
|
# # XXX: there may be a race with the portal teardown
|
||||||
|
# # with the calling actor which we can safely ignore.
|
||||||
|
# # The alternative would be sending an ack message
|
||||||
|
# # and allowing the client to wait for us to teardown
|
||||||
|
# # first?
|
||||||
|
# if lock and lock.locked():
|
||||||
|
# try:
|
||||||
|
# lock.release()
|
||||||
|
# except RuntimeError:
|
||||||
|
# log.exception(f"we don't own the tty lock?")
|
||||||
|
|
||||||
|
# # if isinstance(err, trio.Cancelled):
|
||||||
|
# raise
|
||||||
|
|
||||||
|
# finally:
|
||||||
|
# log.runtime(
|
||||||
|
# "TTY lock released, remote task:"
|
||||||
|
# f"{task_name}:{subactor_uid}"
|
||||||
|
# )
|
||||||
|
|
||||||
return "pdb_unlock_complete"
|
return "pdb_unlock_complete"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -385,6 +397,10 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
partial(shield_sigint, pdb_obj=pdb),
|
partial(shield_sigint, pdb_obj=pdb),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX: These are the important flags mentioned in
|
||||||
|
# https://github.com/python-trio/trio/issues/1155
|
||||||
|
# which resolve the traceback spews to console.
|
||||||
pdb.allow_kbdint = True
|
pdb.allow_kbdint = True
|
||||||
pdb.nosigint = True
|
pdb.nosigint = True
|
||||||
|
|
||||||
|
@ -453,11 +469,15 @@ async def _breakpoint(
|
||||||
_local_task_in_debug = task_name
|
_local_task_in_debug = task_name
|
||||||
|
|
||||||
def child_release_hook():
|
def child_release_hook():
|
||||||
# _local_task_in_debug = None
|
try:
|
||||||
_local_pdb_complete.set()
|
# sometimes the ``trio`` might already be termianated in
|
||||||
|
# which case this call will raise.
|
||||||
# restore original sigint handler
|
_local_pdb_complete.set()
|
||||||
undo_sigint()
|
finally:
|
||||||
|
# restore original sigint handler
|
||||||
|
undo_sigint()
|
||||||
|
# should always be cleared in the hijack hook aboved right?
|
||||||
|
# _local_task_in_debug = None
|
||||||
|
|
||||||
# assign unlock callback for debugger teardown hooks
|
# assign unlock callback for debugger teardown hooks
|
||||||
# _pdb_release_hook = _local_pdb_complete.set
|
# _pdb_release_hook = _local_pdb_complete.set
|
||||||
|
@ -471,11 +491,15 @@ async def _breakpoint(
|
||||||
# we have to figure out how to avoid having the service nursery
|
# we have to figure out how to avoid having the service nursery
|
||||||
# cancel on this task start? I *think* this works below?
|
# cancel on this task start? I *think* this works below?
|
||||||
# actor._service_n.cancel_scope.shield = shield
|
# actor._service_n.cancel_scope.shield = shield
|
||||||
with trio.CancelScope(shield=True):
|
try:
|
||||||
await actor._service_n.start(
|
with trio.CancelScope(shield=True):
|
||||||
wait_for_parent_stdin_hijack,
|
await actor._service_n.start(
|
||||||
actor.uid,
|
wait_for_parent_stdin_hijack,
|
||||||
)
|
actor.uid,
|
||||||
|
)
|
||||||
|
except RuntimeError:
|
||||||
|
child_release_hook()
|
||||||
|
raise
|
||||||
|
|
||||||
elif is_root_process():
|
elif is_root_process():
|
||||||
|
|
||||||
|
@ -524,10 +548,14 @@ async def _breakpoint(
|
||||||
|
|
||||||
_global_actor_in_debug = None
|
_global_actor_in_debug = None
|
||||||
_local_task_in_debug = None
|
_local_task_in_debug = None
|
||||||
_local_pdb_complete.set()
|
|
||||||
|
|
||||||
# restore original sigint handler
|
try:
|
||||||
undo_sigint()
|
# sometimes the ``trio`` might already be termianated in
|
||||||
|
# which case this call will raise.
|
||||||
|
_local_pdb_complete.set()
|
||||||
|
finally:
|
||||||
|
# restore original sigint handler
|
||||||
|
undo_sigint()
|
||||||
|
|
||||||
_pdb_release_hook = teardown
|
_pdb_release_hook = teardown
|
||||||
|
|
||||||
|
@ -581,20 +609,58 @@ def shield_sigint(
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
|
|
||||||
global _local_task_in_debug, _global_actor_in_debug
|
global _local_task_in_debug, _global_actor_in_debug
|
||||||
in_debug = _global_actor_in_debug
|
uid_in_debug = _global_actor_in_debug
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
|
|
||||||
|
def do_cancel():
|
||||||
|
# If we haven't tried to cancel the runtime then do that instead
|
||||||
|
# of raising a KBI (which may non-gracefully destroy
|
||||||
|
# a ``trio.run()``).
|
||||||
|
if not actor._cancel_called:
|
||||||
|
actor.cancel_soon()
|
||||||
|
|
||||||
|
# If the runtime is already cancelled it likely means the user
|
||||||
|
# hit ctrl-c again because teardown didn't full take place in
|
||||||
|
# which case we do the "hard" raising of a local KBI.
|
||||||
|
else:
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
any_connected = False
|
||||||
|
|
||||||
|
if uid_in_debug is not None:
|
||||||
|
# try to see if the supposed (sub)actor in debug still
|
||||||
|
# has an active connection to *this* actor, and if not
|
||||||
|
# it's likely they aren't using the TTY lock / debugger
|
||||||
|
# and we should propagate SIGINT normally.
|
||||||
|
chans = actor._peers.get(tuple(uid_in_debug))
|
||||||
|
if chans:
|
||||||
|
any_connected = any(chan.connected() for chan in chans)
|
||||||
|
if not any_connected:
|
||||||
|
log.warning(
|
||||||
|
'A global actor reported to be in debug '
|
||||||
|
'but no connection exists for this child:\n'
|
||||||
|
f'{uid_in_debug}\n'
|
||||||
|
'Allowing SIGINT propagation..'
|
||||||
|
)
|
||||||
|
return do_cancel()
|
||||||
|
|
||||||
# root actor branch that reports whether or not a child
|
# root actor branch that reports whether or not a child
|
||||||
# has locked debugger.
|
# has locked debugger.
|
||||||
if (
|
if (
|
||||||
is_root_process()
|
is_root_process()
|
||||||
and in_debug
|
and uid_in_debug is not None
|
||||||
|
|
||||||
|
# XXX: only if there is an existing connection to the
|
||||||
|
# (sub-)actor in debug do we ignore SIGINT in this
|
||||||
|
# parent! Otherwise we may hang waiting for an actor
|
||||||
|
# which has already terminated to unlock.
|
||||||
|
and any_connected
|
||||||
):
|
):
|
||||||
name = in_debug[0]
|
name = uid_in_debug[0]
|
||||||
if name != 'root':
|
if name != 'root':
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
|
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -606,6 +672,16 @@ def shield_sigint(
|
||||||
elif (
|
elif (
|
||||||
not is_root_process()
|
not is_root_process()
|
||||||
):
|
):
|
||||||
|
chan: Channel = actor._parent_chan
|
||||||
|
if not chan or not chan.connected():
|
||||||
|
log.warning(
|
||||||
|
'A global actor reported to be in debug '
|
||||||
|
'but no connection exists for its parent:\n'
|
||||||
|
f'{uid_in_debug}\n'
|
||||||
|
'Allowing SIGINT propagation..'
|
||||||
|
)
|
||||||
|
return do_cancel()
|
||||||
|
|
||||||
task = _local_task_in_debug
|
task = _local_task_in_debug
|
||||||
if task:
|
if task:
|
||||||
log.pdb(
|
log.pdb(
|
||||||
|
@ -621,20 +697,6 @@ def shield_sigint(
|
||||||
"Ignoring SIGINT since debug mode is enabled"
|
"Ignoring SIGINT since debug mode is enabled"
|
||||||
)
|
)
|
||||||
|
|
||||||
# noone has the debugger so raise KBI
|
|
||||||
else:
|
|
||||||
# If we haven't tried to cancel the runtime then do that instead
|
|
||||||
# of raising a KBI (which may non-gracefully destroy
|
|
||||||
# a ``trio.run()``).
|
|
||||||
if not actor._cancel_called:
|
|
||||||
actor.cancel_soon()
|
|
||||||
|
|
||||||
# If the runtime is already cancelled it likely means the user
|
|
||||||
# hit ctrl-c again because teardown didn't full take place in
|
|
||||||
# which case we do the "hard" raising of a local KBI.
|
|
||||||
else:
|
|
||||||
raise KeyboardInterrupt
|
|
||||||
|
|
||||||
# maybe redraw/print last REPL output to console
|
# maybe redraw/print last REPL output to console
|
||||||
if pdb_obj:
|
if pdb_obj:
|
||||||
|
|
||||||
|
@ -648,8 +710,12 @@ def shield_sigint(
|
||||||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||||
|
|
||||||
pdb_obj.do_longlist(None)
|
try:
|
||||||
print(pdb_obj.prompt, end='', flush=True)
|
pdb_obj.do_longlist(None)
|
||||||
|
print(pdb_obj.prompt, end='', flush=True)
|
||||||
|
except AttributeError:
|
||||||
|
log.exception('pdbpp longlist failed...')
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
|
||||||
def _set_trace(
|
def _set_trace(
|
||||||
|
@ -667,7 +733,7 @@ def _set_trace(
|
||||||
# start 2 levels up in user code
|
# start 2 levels up in user code
|
||||||
frame: FrameType = sys._getframe()
|
frame: FrameType = sys._getframe()
|
||||||
if frame:
|
if frame:
|
||||||
frame = frame.f_back.f_back # type: ignore
|
frame = frame.f_back # type: ignore
|
||||||
|
|
||||||
if pdb and actor is not None:
|
if pdb and actor is not None:
|
||||||
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
|
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
|
||||||
|
|
|
@ -511,8 +511,8 @@ class Portal:
|
||||||
if ctx.chan.connected():
|
if ctx.chan.connected():
|
||||||
log.info(
|
log.info(
|
||||||
'Waiting on final context-task result for\n'
|
'Waiting on final context-task result for\n'
|
||||||
f'task:{cid}\n'
|
f'task: {cid}\n'
|
||||||
f'actor:{uid}'
|
f'actor: {uid}'
|
||||||
)
|
)
|
||||||
result = await ctx.result()
|
result = await ctx.result()
|
||||||
|
|
||||||
|
|
|
@ -601,7 +601,14 @@ class Context:
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if self._portal:
|
if self._portal:
|
||||||
self._portal._streams.remove(rchan)
|
try:
|
||||||
|
self._portal._streams.remove(stream)
|
||||||
|
except KeyError:
|
||||||
|
log.warning(
|
||||||
|
f'Stream was already destroyed?\n'
|
||||||
|
f'actor: {self.chan.uid}\n'
|
||||||
|
f'ctx id: {self.cid}'
|
||||||
|
)
|
||||||
|
|
||||||
async def result(self) -> Any:
|
async def result(self) -> Any:
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -271,7 +271,12 @@ def _run_asyncio_task(
|
||||||
task.exception()
|
task.exception()
|
||||||
except BaseException as terr:
|
except BaseException as terr:
|
||||||
task_err = terr
|
task_err = terr
|
||||||
log.exception(f'`asyncio` task: {task.get_name()} errored')
|
|
||||||
|
if isinstance(terr, CancelledError):
|
||||||
|
log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
|
||||||
|
else:
|
||||||
|
log.exception(f'`asyncio` task: {task.get_name()} errored')
|
||||||
|
|
||||||
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
|
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
|
||||||
|
|
||||||
if aio_err is not None:
|
if aio_err is not None:
|
||||||
|
|
Loading…
Reference in New Issue