Compare commits

..

No commits in common. "dereg_on_oserror" and "master" have entirely different histories.

30 changed files with 907 additions and 3132 deletions

View File

@ -3,8 +3,8 @@
|gh_actions|
|docs|
``tractor`` is a `structured concurrent`_, (optionally
distributed_) multi-processing_ runtime built on trio_.
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
@ -17,20 +17,11 @@ protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
but likely **does not** look like what **you** probably *think* an "actor
model" looks like, and that's **intentional**.
but likely *does not* look like what *you* probably think an "actor
model" looks like, and that's *intentional*.
Where do i start!?
------------------
The first step to grok ``tractor`` is to get an intermediate
knowledge of ``trio`` and **structured concurrency** B)
Some great places to start are,
- the seminal `blog post`_
- obviously the `trio docs`_
- wikipedia's nascent SC_ page
- the fancy diagrams @ libdill-docs_
The first step to grok ``tractor`` is to get the basics of ``trio`` down.
A great place to start is the `trio docs`_ and this `blog post`_.
Features
@ -602,7 +593,6 @@ matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_!
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
@ -621,9 +611,8 @@ channel`_!
.. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency
.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel

View File

@ -1,117 +0,0 @@
import asyncio
import trio
import tractor
from tractor import to_asyncio
async def aio_sleep_forever():
await asyncio.sleep(float('inf'))
async def bp_then_error(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
raise_after_bp: bool = True,
) -> None:
# sync with ``trio``-side (caller) task
to_trio.send_nowait('start')
# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `._debug._set_trace()` but
# we set `Lock.local_task_in_debug = 'sync'`, we probably want
# some further, at least, meta-data about the task/actoq in debug
# in terms of making it clear it's asyncio mucking about.
breakpoint()
# short checkpoint / delay
await asyncio.sleep(0.5)
if raise_after_bp:
raise ValueError('blah')
# TODO: test case with this so that it gets cancelled?
else:
# XXX NOTE: this is required in order to get the SIGINT-ignored
# hang case documented in the module script section!
await aio_sleep_forever()
@tractor.context
async def trio_ctx(
ctx: tractor.Context,
bp_before_started: bool = False,
):
# this will block until the ``asyncio`` task sends a "first"
# message, see first line in above func.
async with (
to_asyncio.open_channel_from(
bp_then_error,
raise_after_bp=not bp_before_started,
) as (first, chan),
trio.open_nursery() as n,
):
assert first == 'start'
if bp_before_started:
await tractor.breakpoint()
await ctx.started(first)
n.start_soon(
to_asyncio.run_task,
aio_sleep_forever,
)
await trio.sleep_forever()
async def main(
bps_all_over: bool = False,
) -> None:
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)
async with p.open_context(
trio_ctx,
bp_before_started=bps_all_over,
) as (ctx, first):
assert first == 'start'
if bps_all_over:
await tractor.breakpoint()
# await trio.sleep_forever()
await ctx.cancel()
assert 0
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
# await p.cancel_actor()
if __name__ == '__main__':
# works fine B)
trio.run(main)
# will hang and ignores SIGINT !!
# NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it
# manually..
# trio.run(main, True)

View File

@ -6,4 +6,3 @@ mypy
trio_typing
pexpect
towncrier
numpy

View File

@ -41,9 +41,6 @@ setup(
],
install_requires=[
# discovery subsys
'bidict',
# trio related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5

View File

@ -219,8 +219,7 @@ def daemon(
arb_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote registrar" and/or plain ol
separate actor (service) tree.
Run a daemon actor as a "remote arbiter".
'''
if loglevel in ('trace', 'debug'):

View File

@ -49,7 +49,7 @@ async def worker(
await ctx.started()
async with ctx.open_stream(
allow_overruns=True,
backpressure=True,
) as stream:
# TODO: this with the below assert causes a hang bug?

View File

@ -13,10 +13,7 @@ from typing import Optional
import pytest
import trio
import tractor
from tractor._exceptions import (
StreamOverrun,
ContextCancelled,
)
from tractor._exceptions import StreamOverrun
from conftest import tractor_test
@ -94,10 +91,7 @@ async def not_started_but_stream_opened(
@pytest.mark.parametrize(
'target',
[
too_many_starteds,
not_started_but_stream_opened,
],
[too_many_starteds, not_started_but_stream_opened],
ids='misuse_type={}'.format,
)
def test_started_misuse(target):
@ -234,88 +228,6 @@ def test_simple_context(
trio.run(main)
@pytest.mark.parametrize(
'callee_returns_early',
[True, False],
ids=lambda item: f'callee_returns_early={item}'
)
@pytest.mark.parametrize(
'cancel_method',
['ctx', 'portal'],
ids=lambda item: f'cancel_method={item}'
)
@pytest.mark.parametrize(
'chk_ctx_result_before_exit',
[True, False],
ids=lambda item: f'chk_ctx_result_before_exit={item}'
)
def test_caller_cancels(
cancel_method: str,
chk_ctx_result_before_exit: bool,
callee_returns_early: bool,
):
'''
Verify that when the opening side of a context (aka the caller)
cancels that context, the ctx does not raise a cancelled when
either calling `.result()` or on context exit.
'''
async def check_canceller(
ctx: tractor.Context,
) -> None:
# should not raise yet return the remote
# context cancelled error.
res = await ctx.result()
if callee_returns_early:
assert res == 'yo'
else:
err = res
assert isinstance(err, ContextCancelled)
assert (
tuple(err.canceller)
==
tractor.current_actor().uid
)
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'simple_context',
enable_modules=[__name__],
)
timeout = 0.5 if not callee_returns_early else 2
with trio.fail_after(timeout):
async with portal.open_context(
simple_setup_teardown,
data=10,
block_forever=not callee_returns_early,
) as (ctx, sent):
if callee_returns_early:
# ensure we block long enough before sending
# a cancel such that the callee has already
# returned it's result.
await trio.sleep(0.5)
if cancel_method == 'ctx':
await ctx.cancel()
else:
await portal.cancel_actor()
if chk_ctx_result_before_exit:
await check_canceller(ctx)
if not chk_ctx_result_before_exit:
await check_canceller(ctx)
if cancel_method != 'portal':
await portal.cancel_actor()
trio.run(main)
# basic stream terminations:
# - callee context closes without using stream
# - caller context closes without using stream
@ -594,6 +506,7 @@ async def test_callee_cancels_before_started():
cancel_self,
) as (ctx, sent):
async with ctx.open_stream():
await trio.sleep_forever()
# raises a special cancel signal
@ -646,6 +559,7 @@ async def keep_sending_from_callee(
'overrun_by',
[
('caller', 1, never_open_stream),
('cancel_caller_during_overrun', 1, never_open_stream),
('callee', 0, keep_sending_from_callee),
],
ids='overrun_condition={}'.format,
@ -675,13 +589,14 @@ def test_one_end_stream_not_opened(overrun_by):
if 'caller' in overrunner:
async with ctx.open_stream() as stream:
# itersend +1 msg more then the buffer size
# to cause the most basic overrun.
for i in range(buf_size):
print(f'sending {i}')
await stream.send(i)
if 'cancel' in overrunner:
# without this we block waiting on the child side
await ctx.cancel()
else:
# expect overrun error to be relayed back
# and this sleep interrupted
@ -695,9 +610,7 @@ def test_one_end_stream_not_opened(overrun_by):
# 2 overrun cases and the no overrun case (which pushes right up to
# the msg limit)
if (
overrunner == 'caller'
):
if overrunner == 'caller' or 'cance' in overrunner:
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
@ -721,102 +634,40 @@ async def echo_back_sequence(
ctx: tractor.Context,
seq: list[int],
wait_for_cancel: bool,
allow_overruns_side: str,
be_slow: bool = False,
msg_buffer_size: int = 1,
msg_buffer_size: Optional[int] = None,
) -> None:
'''
Send endlessly on the calleee stream using a small buffer size
setting on the contex to simulate backlogging that would normally
cause overruns.
Send endlessly on the calleee stream.
'''
# NOTE: ensure that if the caller is expecting to cancel this task
# that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg.
total_batches: int = 1000 if wait_for_cancel else 6
await ctx.started()
# await tractor.breakpoint()
async with ctx.open_stream(
msg_buffer_size=msg_buffer_size,
# literally the point of this test XD
allow_overruns=(allow_overruns_side in {'child', 'both'}),
) as stream:
# ensure mem chan settings are correct
assert (
ctx._send_chan._state.max_buffer_size
==
msg_buffer_size
)
seq = list(seq) # bleh, msgpack sometimes ain't decoded right
for _ in range(total_batches):
seq = list(seq) # bleh, `msgpack`...
count = 0
while count < 3:
batch = []
async for msg in stream:
batch.append(msg)
if batch == seq:
break
if be_slow:
await trio.sleep(0.05)
print('callee waiting on next')
for msg in batch:
print(f'callee sending {msg}')
await stream.send(msg)
print(
'EXITING CALLEEE:\n'
f'{ctx.cancel_called_remote}'
)
count += 1
return 'yo'
@pytest.mark.parametrize(
# aka the side that will / should raise
# and overrun under normal conditions.
'allow_overruns_side',
['parent', 'child', 'none', 'both'],
ids=lambda item: f'allow_overruns_side={item}'
)
@pytest.mark.parametrize(
# aka the side that will / should raise
# and overrun under normal conditions.
'slow_side',
['parent', 'child'],
ids=lambda item: f'slow_side={item}'
)
@pytest.mark.parametrize(
'cancel_ctx',
[True, False],
ids=lambda item: f'cancel_ctx={item}'
)
def test_maybe_allow_overruns_stream(
cancel_ctx: bool,
slow_side: str,
allow_overruns_side: str,
loglevel: str,
):
def test_stream_backpressure():
'''
Demonstrate small overruns of each task back and forth
on a stream not raising any errors by default by setting
the ``allow_overruns=True``.
The original idea here was to show that if you set the feeder mem
chan to a size smaller then the # of msgs sent you could could not
get a `StreamOverrun` crash plus maybe get all the msgs that were
sent. The problem with the "real backpressure" case is that due to
the current arch it can result in the msg loop being blocked and thus
blocking cancellation - which is like super bad. So instead this test
had to be adjusted to more or less just "not send overrun errors" so
as to handle the case where the sender just moreso cares about not getting
errored out when it send to fast..
on a stream not raising any errors by default.
'''
async def main():
@ -824,105 +675,39 @@ def test_maybe_allow_overruns_stream(
portal = await n.start_actor(
'callee_sends_forever',
enable_modules=[__name__],
loglevel=loglevel,
# debug_mode=True,
)
seq = list(range(10))
seq = list(range(3))
async with portal.open_context(
echo_back_sequence,
seq=seq,
wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side,
msg_buffer_size=1,
) as (ctx, sent):
assert sent is None
async with ctx.open_stream(
msg_buffer_size=1 if slow_side == 'parent' else None,
allow_overruns=(allow_overruns_side in {'parent', 'both'}),
) as stream:
total_batches: int = 2
for _ in range(total_batches):
async with ctx.open_stream(msg_buffer_size=1) as stream:
count = 0
while count < 3:
for msg in seq:
# print(f'root tx {msg}')
print(f'caller sending {msg}')
await stream.send(msg)
if slow_side == 'parent':
# NOTE: we make the parent slightly
# slower, when it is slow, to make sure
# that in the overruns everywhere case
await trio.sleep(0.16)
await trio.sleep(0.1)
batch = []
async for msg in stream:
print(f'root rx {msg}')
batch.append(msg)
if batch == seq:
break
if cancel_ctx:
# cancel the remote task
print('sending root side cancel')
await ctx.cancel()
count += 1
res = await ctx.result()
if cancel_ctx:
assert isinstance(res, ContextCancelled)
assert tuple(res.canceller) == tractor.current_actor().uid
else:
print(f'RX ROOT SIDE RESULT {res}')
assert res == 'yo'
# here the context should return
assert await ctx.result() == 'yo'
# cancel the daemon
await portal.cancel_actor()
if (
allow_overruns_side == 'both'
or slow_side == allow_overruns_side
):
trio.run(main)
elif (
slow_side != allow_overruns_side
):
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
err = excinfo.value
if (
allow_overruns_side == 'none'
):
# depends on timing is is racy which side will
# overrun first :sadkitty:
# NOTE: i tried to isolate to a deterministic case here
# based on timeing, but i was kinda wasted, and i don't
# think it's sane to catch them..
assert err.type in (
tractor.RemoteActorError,
StreamOverrun,
)
elif (
slow_side == 'child'
):
assert err.type == StreamOverrun
elif slow_side == 'parent':
assert err.type == tractor.RemoteActorError
assert 'StreamOverrun' in err.msgdata['tb_str']
else:
# if this hits the logic blocks from above are not
# exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
@tractor.context
async def sleep_forever(
@ -952,18 +737,18 @@ async def attach_to_sleep_forever():
finally:
# XXX: previously this would trigger local
# ``ContextCancelled`` to be received and raised in the
# local context overriding any local error due to logic
# inside ``_invoke()`` which checked for an error set on
# ``Context._error`` and raised it in a cancellation
# scenario.
# ------
# The problem is you can have a remote cancellation that
# is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we **were not** the side
# of the context to initiate it, i.e.
# local context overriding any local error due to
# logic inside ``_invoke()`` which checked for
# an error set on ``Context._error`` and raised it in
# under a cancellation scenario.
# The problem is you can have a remote cancellation
# that is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we weren't the side of
# the context to initiate it, i.e.
# ``Context._cancel_called`` should **NOT** have been
# set. The special logic to handle this case is now
# inside ``Context._maybe_raise_from_remote_msg()`` XD
# inside ``Context._may_raise_from_remote_msg()`` XD
await peer_ctx.cancel()
@ -984,10 +769,9 @@ async def error_before_started(
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself opens
another remote context, which it cancels, does not ovverride the
original error that caused the cancellation of the secondardy
context.
Verify that an error raised in a remote context which itself opens another
remote context, which it cancels, does not ovverride the original error that
caused the cancellation of the secondardy context.
'''
async def main():

View File

@ -1,7 +1,6 @@
'''
Discovery subsystem via a "registrar" actor scenarios.
'''
"""
Actor "discovery" testing
"""
import os
import signal
import platform
@ -128,10 +127,7 @@ async def unpack_reg(actor_or_portal):
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {
tuple(key.split('.')): val
for key, val in msg.items()
}
return {tuple(key.split('.')): val for key, val in msg.items()}
async def spawn_and_check_registry(
@ -287,22 +283,18 @@ async def close_chans_before_nursery(
async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1',
enable_modules=[__name__],
)
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2',
enable_modules=[__name__],
)
'consumer2', enable_modules=[__name__])
async with (
portal1.open_stream_from(
# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever
) as agen1,
portal2.open_stream_from(
) as agen1:
async with portal2.open_stream_from(
stream_forever
) as agen2,
):
) as agen2:
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
@ -339,12 +331,10 @@ def test_close_channel_explicit(
use_signal,
arb_addr,
):
'''
Verify that closing a stream explicitly and killing the actor's
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
'''
"""
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -357,18 +347,16 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_registrar(
def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
arb_addr,
):
'''
Verify that closing a stream explicitly and killing the actor's
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
'''
"""
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -378,51 +366,3 @@ def test_close_channel_explicit_remote_registrar(
remote_arbiter=True,
),
)
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.cancel_server()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
daemon,
start_method,
arb_addr,
):
'''
Ensure that when a stale entry is detected in the registrar's table
that the `find_actor()` API takes care of deleting the stale entry
and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
regport: tractor.Portal
tn: tractor.ActorNursery
async with (
tractor.open_nursery() as tn,
tractor.get_registrar(*arb_addr) as regport,
):
ptl: tractor.Portal = await tn.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
assert maybe_portal is None
await ptl.cancel_actor()
trio.run(main)

View File

@ -15,7 +15,6 @@ import tractor
from tractor import (
to_asyncio,
RemoteActorError,
ContextCancelled,
)
from tractor.trionics import BroadcastReceiver
@ -225,23 +224,14 @@ def test_context_spawns_aio_task_that_errors(
await trio.sleep_forever()
return await ctx.result()
if parent_cancels:
# bc the parent made the cancel request,
# the error is not raised locally but instead
# the context is exited silently
res = trio.run(main)
assert isinstance(res, ContextCancelled)
assert 'root' in res.canceller[0]
else:
expect = RemoteActorError
with pytest.raises(expect) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
err = excinfo.value
assert isinstance(err, expect)
assert isinstance(err, RemoteActorError)
if parent_cancels:
assert err.type == trio.Cancelled
else:
assert err.type == AssertionError

View File

@ -1,167 +0,0 @@
"""
Shared mem primitives and APIs.
"""
import uuid
# import numpy
import pytest
import trio
import tractor
from tractor._shm import (
open_shm_list,
attach_shm_list,
)
@tractor.context
async def child_attach_shml_alot(
ctx: tractor.Context,
shm_key: str,
) -> None:
await ctx.started(shm_key)
# now try to attach a boatload of times in a loop..
for _ in range(1000):
shml = attach_shm_list(
key=shm_key,
readonly=False,
)
assert shml.shm.name == shm_key
await trio.sleep(0.001)
def test_child_attaches_alot():
async def main():
async with tractor.open_nursery() as an:
# allocate writeable list in parent
key = f'shml_{uuid.uuid4()}'
shml = open_shm_list(
key=key,
)
portal = await an.start_actor(
'shm_attacher',
enable_modules=[__name__],
)
async with (
portal.open_context(
child_attach_shml_alot,
shm_key=shml.key,
) as (ctx, start_val),
):
assert start_val == key
await ctx.result()
await portal.cancel_actor()
trio.run(main)
@tractor.context
async def child_read_shm_list(
ctx: tractor.Context,
shm_key: str,
use_str: bool,
frame_size: int,
) -> None:
# attach in child
shml = attach_shm_list(
key=shm_key,
# dtype=str if use_str else float,
)
await ctx.started(shml.key)
async with ctx.open_stream() as stream:
async for i in stream:
print(f'(child): reading shm list index: {i}')
if use_str:
expect = str(float(i))
else:
expect = float(i)
if frame_size == 1:
val = shml[i]
assert expect == val
print(f'(child): reading value: {val}')
else:
frame = shml[i - frame_size:i]
print(f'(child): reading frame: {frame}')
@pytest.mark.parametrize(
'use_str',
[False, True],
ids=lambda i: f'use_str_values={i}',
)
@pytest.mark.parametrize(
'frame_size',
[1, 2**6, 2**10],
ids=lambda i: f'frame_size={i}',
)
def test_parent_writer_child_reader(
use_str: bool,
frame_size: int,
):
async def main():
async with tractor.open_nursery(
# debug_mode=True,
) as an:
portal = await an.start_actor(
'shm_reader',
enable_modules=[__name__],
debug_mode=True,
)
# allocate writeable list in parent
key = 'shm_list'
seq_size = int(2 * 2 ** 10)
shml = open_shm_list(
key=key,
size=seq_size,
dtype=str if use_str else float,
readonly=False,
)
async with (
portal.open_context(
child_read_shm_list,
shm_key=key,
use_str=use_str,
frame_size=frame_size,
) as (ctx, sent),
ctx.open_stream() as stream,
):
assert sent == key
for i in range(seq_size):
val = float(i)
if use_str:
val = str(val)
# print(f'(parent): writing {val}')
shml[i] = val
# only on frame fills do we
# signal to the child that a frame's
# worth is ready.
if (i % frame_size) == 0:
print(f'(parent): signalling frame full on {val}')
await stream.send(i)
else:
print(f'(parent): signalling final frame on {val}')
await stream.send(i)
await portal.cancel_actor()
trio.run(main)

View File

@ -86,7 +86,7 @@ async def open_sequence_streamer(
) as (ctx, first):
assert first is None
async with ctx.open_stream(allow_overruns=True) as stream:
async with ctx.open_stream(backpressure=True) as stream:
yield stream
await portal.cancel_actor()
@ -413,8 +413,8 @@ def test_ensure_slow_consumers_lag_out(
seq = brx._state.subs[brx.key]
assert seq == len(brx._state.queue) - 1
# all no_overruns entries in the underlying
# channel should have been copied into the bcaster
# all backpressured entries in the underlying
# channel should have been copied into the caster
# queue trailing-window
async for i in rx:
print(f'bped: {i}')

View File

@ -15,23 +15,21 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
tractor: structured concurrent ``trio``-"actors".
tractor: structured concurrent "actors".
"""
from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._context import (
Context,
context,
)
from ._ipc import Channel
from ._streaming import (
Context,
MsgStream,
stream,
context,
)
from ._discovery import (
get_arbiter,
get_registrar,
find_actor,
wait_for_actor,
query_actor,
@ -48,8 +46,6 @@ from ._exceptions import (
)
from ._debug import (
breakpoint,
pause,
pause_from_sync,
post_mortem,
)
from . import msg
@ -57,36 +53,31 @@ from ._root import (
run_daemon,
open_root_actor,
)
from ._ipc import Channel
from ._portal import Portal
from ._runtime import Actor
__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'BaseExceptionGroup',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'get_registrar',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',

View File

@ -1,778 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
from collections import deque
from contextlib import asynccontextmanager as acm
from dataclasses import (
dataclass,
field,
)
from functools import partial
import inspect
from pprint import pformat
from typing import (
Any,
Callable,
AsyncGenerator,
TYPE_CHECKING,
)
import warnings
import trio
from ._exceptions import (
unpack_error,
pack_error,
ContextCancelled,
StreamOverrun,
)
from .log import get_logger
from ._ipc import Channel
from ._streaming import MsgStream
from ._state import current_actor
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__)
@dataclass
class Context:
'''
An inter-actor, ``trio``-task communication context.
NB: This class should never be instatiated directly, it is delivered
by either,
- runtime machinery to a remotely started task or,
- by entering ``Portal.open_context()``.
and is always constructed using ``mkt_context()``.
Allows maintaining task or protocol specific state between
2 communicating, parallel executing actor tasks. A unique context is
allocated on each side of any task RPC-linked msg dialog, for
every request to a remote actor from a portal. On the "callee"
side a context is always allocated inside ``._runtime._invoke()``.
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task
oriented message streams, and acts more or less as an IPC aware
inter-actor-task ``trio.CancelScope``.
'''
chan: Channel
cid: str
# these are the "feeder" channels for delivering
# message values to the local task from the runtime
# msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
_remote_func_type: str | None = None
# only set on the caller side
_portal: Portal | None = None # type: ignore # noqa
_result: Any | int = None
_remote_error: BaseException | None = None
# cancellation state
_cancel_called: bool = False
_cancelled_remote: tuple | None = None
_cancel_msg: str | None = None
_scope: trio.CancelScope | None = None
_enter_debugger_on_cancel: bool = True
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this context
by either an explicit call to ``.cancel()`` or an implicit call
due to an error caught inside the ``Portal.open_context()``
block.
'''
return self._cancel_called
@property
def cancel_called_remote(self) -> tuple[str, str] | None:
'''
``Actor.uid`` of the remote actor who's task was cancelled
causing this side of the context to also be cancelled.
'''
remote_uid = self._cancelled_remote
if remote_uid:
return tuple(remote_uid)
@property
def cancelled_caught(self) -> bool:
return self._scope.cancelled_caught
# init and streaming state
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote
# task, only an ability to not lose messages when the local
# task is configured to NOT transmit ``StreamOverrun``s back
# to the other side.
_overflow_q: deque[dict] = field(
default_factory=partial(
deque,
maxlen=616,
)
)
_scope_nursery: trio.Nursery | None = None
_in_overrun: bool = False
_allow_overruns: bool = False
async def send_yield(
self,
data: Any,
) -> None:
warnings.warn(
"`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ",
DeprecationWarning,
stacklevel=2,
)
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
async def _maybe_cancel_and_set_remote_error(
self,
error_msg: dict[str, Any],
) -> None:
'''
(Maybe) unpack and raise a msg error into the local scope
nursery for this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
# If this is an error message from a context opened by
# ``Portal.open_context()`` we want to interrupt any ongoing
# (child) tasks within that context to be notified of the remote
# error relayed here.
#
# The reason we may want to raise the remote error immediately
# is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
error = unpack_error(
error_msg,
self.chan,
)
# XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise
# that error as the reason.
self._remote_error = error
# always record the remote actor's uid since its cancellation
# state is directly linked to ours (the local one).
self._cancelled_remote = self.chan.uid
if (
isinstance(error, ContextCancelled)
):
log.cancel(
'Remote task-context sucessfully cancelled for '
f'{self.chan.uid}:{self.cid}'
)
if self._cancel_called:
# from ._debug import breakpoint
# await breakpoint()
# this is an expected cancel request response message
# and we **don't need to raise it** in local cancel
# scope since it will potentially override a real error.
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{error_msg["error"]["tb_str"]}'
)
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
# YES! this is way better and simpler!
if (
self._scope
):
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
# self._cancelled_remote = self.chan.uid
self._scope.cancel()
# NOTE: this usage actually works here B)
# from ._debug import breakpoint
# await breakpoint()
# XXX: this will break early callee results sending
# since when `.result()` is finally called, this
# chan will be closed..
# if self._recv_chan:
# await self._recv_chan.aclose()
async def cancel(
self,
msg: str | None = None,
timeout: float = 0.616,
# timeout: float = 1000,
) -> None:
'''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals...
'''
side = 'caller' if self._portal else 'callee'
if msg:
assert side == 'callee', 'Only callee side can provide cancel msg'
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
# await _debug.breakpoint()
# breakpoint()
if side == 'caller':
if not self._portal:
raise RuntimeError(
"No portal found, this is likely a callee side context"
)
cid = self.cid
with trio.move_on_after(timeout) as cs:
cs.shield = True
log.cancel(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
# print("EXITING CANCEL CALL")
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.cancel(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
log.cancel(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
else:
self._cancel_msg = msg
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
assert self._scope
self._scope.cancel()
@acm
async def open_stream(
self,
allow_overruns: bool | None = False,
msg_buffer_size: int | None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx = actor.get_context(
self.chan,
self.cid,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
def _maybe_raise_remote_err(
self,
err: Exception,
) -> None:
# NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via
# ``Context.cancel()``), we don't want to re-raise that
# cancellation signal locally (would be akin to
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
# whenever ``CancelScope.cancel()`` was called) and instead
# silently reap the expected cancellation "error"-msg.
# if 'pikerd' in err.msgdata['tb_str']:
# # from . import _debug
# # await _debug.breakpoint()
# breakpoint()
if (
isinstance(err, ContextCancelled)
and (
self._cancel_called
or self.chan._cancel_called
or tuple(err.canceller) == current_actor().uid
)
):
return err
raise err # from None
async def result(self) -> Any | Exception:
'''
From some (caller) side task, wait for and return the final
result from the remote (callee) side's task.
This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate.
If the remote task is still in a streaming state (it is delivering
values from inside a ``Context.open_stream():`` block, then those
msgs are drained but discarded since it is presumed this side of
the context has already finished with its own streaming logic.
If the remote context (or its containing actor runtime) was
canceled, either by a local task calling one of
``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the
received ``ContextCancelled`` exception if the context or
underlying IPC channel is marked as having been "cancel called".
This is similar behavior to using ``trio.Nursery.cancel()``
wherein tasks which raise ``trio.Cancel`` are silently reaped;
the main different in this API is in the "cancel called" case,
instead of just not raising, we also return the exception *as
the result* since client code may be interested in the details
of the remote cancellation.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
# from . import _debug
# await _debug.breakpoint()
re = self._remote_error
if re:
self._maybe_raise_remote_err(re)
return re
if (
self._result == id(self)
and not self._remote_error
and not self._recv_chan._closed # type: ignore
):
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if self._recv_chan:
# await self._recv_chan.aclose()
break
except KeyError: # as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
err = unpack_error(
msg,
self._portal.channel
) # from msgerr
err = self._maybe_raise_remote_err(err)
self._remote_err = err
return self._remote_error or self._result
async def started(
self,
value: Any | None = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
async def _drain_overflows(
self,
) -> None:
'''
Private task spawned to push newly received msgs to the local
task which getting overrun by the remote side.
In order to not block the rpc msg loop, but also not discard
msgs received in this context, we need to async push msgs in
a new task which only runs for as long as the local task is in
an overrun state.
'''
self._in_overrun = True
try:
while self._overflow_q:
# NOTE: these msgs should never be errors since we always do
# the check prior to checking if we're in an overrun state
# inside ``.deliver_msg()``.
msg = self._overflow_q.popleft()
try:
await self._send_chan.send(msg)
except trio.BrokenResourceError:
log.warning(
f"{self._send_chan} consumer is already closed"
)
return
except trio.Cancelled:
# we are obviously still in overrun
# but the context is being closed anyway
# so we just warn that there are un received
# msgs still..
self._overflow_q.appendleft(msg)
fmt_msgs = ''
for msg in self._overflow_q:
fmt_msgs += f'{pformat(msg)}\n'
log.warning(
f'Context for {self.cid} is being closed while '
'in an overrun state!\n'
'Discarding the following msgs:\n'
f'{fmt_msgs}\n'
)
raise
finally:
# task is now finished with the backlog so mark us as
# no longer in backlog.
self._in_overrun = False
async def _deliver_msg(
self,
msg: dict,
draining: bool = False,
) -> bool:
cid = self.cid
chan = self.chan
uid = chan.uid
send_chan: trio.MemorySendChannel = self._send_chan
log.runtime(
f"Delivering {msg} from {uid} to caller {cid}"
)
error = msg.get('error')
if error:
await self._maybe_cancel_and_set_remote_error(msg)
if (
self._in_overrun
):
self._overflow_q.append(msg)
return False
try:
send_chan.send_nowait(msg)
return True
# if an error is deteced we should always
# expect it to be raised by any context (stream)
# consumer task
except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
return False
# NOTE XXX: by default we do **not** maintain context-stream
# backpressure and instead opt to relay stream overrun errors to
# the sender; the main motivation is that using bp can block the
# msg handling loop which calls into this method!
except trio.WouldBlock:
# XXX: always push an error even if the local
# receiver is in overrun state.
# await self._maybe_cancel_and_set_remote_error(msg)
local_uid = current_actor().uid
lines = [
f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
# TODO: put remote task name here if possible?
f'remote sender actor: {uid}',
# TODO: put task func name here and maybe an arrow
# from sender to overrunner?
# f'local task {self.func_name}'
]
if not self._stream_opened:
lines.insert(
1,
f'\n*** No stream open on `{local_uid[0]}` side! ***\n'
)
text = '\n'.join(lines)
# XXX: lul, this really can't be backpressure since any
# blocking here will block the entire msg loop rpc sched for
# a whole channel.. maybe we should rename it?
if self._allow_overruns:
text += f'\nStarting overflow queuing task on msg: {msg}'
log.warning(text)
if (
not self._in_overrun
):
self._overflow_q.append(msg)
n = self._scope_nursery
assert not n.child_tasks
try:
n.start_soon(
self._drain_overflows,
)
except RuntimeError:
# if the nursery is already cancelled due to
# this context exiting or in error, we ignore
# the nursery error since we never expected
# anything different.
return False
else:
try:
raise StreamOverrun(text)
except StreamOverrun as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
return False
def mk_context(
chan: Channel,
cid: str,
msg_buffer_size: int = 2**6,
**kwargs,
) -> Context:
'''
Internal factory to create an inter-actor task ``Context``.
This is called by internals and should generally never be called
by user code.
'''
send_chan: trio.MemorySendChannel
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
ctx = Context(
chan,
cid,
_send_chan=send_chan,
_recv_chan=recv_chan,
**kwargs,
)
ctx._result: int | Any = id(ctx)
return ctx
def context(func: Callable) -> Callable:
'''
Mark an async function as a streaming routine with ``@context``.
'''
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func

View File

@ -30,6 +30,7 @@ from functools import (
from contextlib import asynccontextmanager as acm
from typing import (
Any,
Optional,
Callable,
AsyncIterator,
AsyncGenerator,
@ -39,10 +40,7 @@ from types import FrameType
import pdbp
import tractor
import trio
from trio_typing import (
TaskStatus,
# Task,
)
from trio_typing import TaskStatus
from .log import get_logger
from ._discovery import get_root
@ -71,10 +69,10 @@ class Lock:
'''
repl: MultiActorPdb | None = None
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Callable | None = None
# pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[
[int, FrameType | None], Any
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger
@ -85,23 +83,23 @@ class Lock:
# and must be cancelled if this actor is cancelled via IPC
# request-message otherwise deadlocks with the parent actor may
# ensure
_debugger_request_cs: trio.CancelScope | None = None
_debugger_request_cs: Optional[trio.CancelScope] = None
# NOTE: set only in the root actor for the **local** root spawned task
# which has acquired the lock (i.e. this is on the callee side of
# the `lock_tty_for_child()` context entry).
_root_local_task_cs_in_debug: trio.CancelScope | None = None
_root_local_task_cs_in_debug: Optional[trio.CancelScope] = None
# actor tree-wide actor uid that supposedly has the tty lock
global_actor_in_debug: tuple[str, str] = None
global_actor_in_debug: Optional[tuple[str, str]] = None
local_pdb_complete: trio.Event | None = None
no_remote_has_tty: trio.Event | None = None
local_pdb_complete: Optional[trio.Event] = None
no_remote_has_tty: Optional[trio.Event] = None
# lock in root actor preventing multi-access to local tty
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
_orig_sigint_handler: Callable | None = None
_orig_sigint_handler: Optional[Callable] = None
_blocked: set[tuple[str, str]] = set()
@classmethod
@ -112,7 +110,6 @@ class Lock:
)
@classmethod
@pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()`
def unshield_sigint(cls):
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
@ -132,6 +129,10 @@ class Lock:
if owner:
raise
# actor-local state, irrelevant for non-root.
cls.global_actor_in_debug = None
cls.local_task_in_debug = None
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
@ -142,11 +143,6 @@ class Lock:
cls.unshield_sigint()
cls.repl = None
# actor-local state, irrelevant for non-root.
cls.global_actor_in_debug = None
cls.local_task_in_debug = None
class TractorConfig(pdbp.DefaultConfig):
'''
@ -155,7 +151,7 @@ class TractorConfig(pdbp.DefaultConfig):
'''
use_pygments: bool = True
sticky_by_default: bool = False
enable_hidden_frames: bool = True
enable_hidden_frames: bool = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
@ -232,23 +228,26 @@ async def _acquire_debug_lock_from_root_task(
to the ``pdb`` repl.
'''
task_name: str = trio.lowlevel.current_task().name
we_acquired: bool = False
task_name = trio.lowlevel.current_task().name
log.runtime(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
)
we_acquired = False
try:
log.runtime(
f"entering lock checkpoint, remote task: {task_name}:{uid}"
)
we_acquired = True
# NOTE: if the surrounding cancel scope from the
# `lock_tty_for_child()` caller is cancelled, this line should
# unblock and NOT leave us in some kind of
# a "child-locked-TTY-but-child-is-uncontactable-over-IPC"
# condition.
await Lock._debug_lock.acquire()
we_acquired = True
if Lock.no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
@ -375,7 +374,7 @@ async def wait_for_parent_stdin_hijack(
This function is used by any sub-actor to acquire mutex access to
the ``pdb`` REPL and thus the root's TTY for interactive debugging
(see below inside ``_pause()``). It can be used to ensure that
(see below inside ``_breakpoint()``). It can be used to ensure that
an intermediate nursery-owning actor does not clobber its children
if they are in debug (see below inside
``maybe_wait_for_debugger()``).
@ -441,29 +440,17 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
return pdb, Lock.unshield_sigint
async def _pause(
async def _breakpoint(
debug_func: Callable | None = None,
release_lock_signal: trio.Event | None = None,
debug_func,
# TODO:
# shield: bool = False
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None:
'''
A pause point (more commonly known as a "breakpoint") interrupt
instruction for engaging a blocking debugger instance to
conduct manual console-based-REPL-interaction from within
`tractor`'s async runtime, normally from some single-threaded
and currently executing actor-hosted-`trio`-task in some
(remote) process.
NOTE: we use the semantics "pause" since it better encompasses
the entirety of the necessary global-runtime-state-mutation any
actor-task must access and lock in order to get full isolated
control over the process tree's root TTY:
https://en.wikipedia.org/wiki/Breakpoint
Breakpoint entry for engaging debugger instance sync-interaction,
from async code, executing in actor runtime (task).
'''
__tracebackhide__ = True
@ -572,19 +559,6 @@ async def _pause(
Lock.repl = pdb
try:
# breakpoint()
if debug_func is None:
# assert release_lock_signal, (
# 'Must pass `release_lock_signal: trio.Event` if no '
# 'trace func provided!'
# )
print(f"{actor.uid} ENTERING WAIT")
task_status.started()
# with trio.CancelScope(shield=True):
# await release_lock_signal.wait()
else:
# block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio.
log.debug("Entering the synchronous world of pdb")
@ -609,7 +583,7 @@ async def _pause(
def shield_sigint_handler(
signum: int,
frame: 'frame', # type: ignore # noqa
# pdb_obj: MultiActorPdb | None = None,
# pdb_obj: Optional[MultiActorPdb] = None,
*args,
) -> None:
@ -623,7 +597,7 @@ def shield_sigint_handler(
'''
__tracebackhide__ = True
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug
uid_in_debug = Lock.global_actor_in_debug
actor = tractor.current_actor()
# print(f'{actor.uid} in HANDLER with ')
@ -641,14 +615,14 @@ def shield_sigint_handler(
else:
raise KeyboardInterrupt
any_connected: bool = False
any_connected = False
if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
chans: list[tractor.Channel] = actor._peers.get(tuple(uid_in_debug))
chans = actor._peers.get(tuple(uid_in_debug))
if chans:
any_connected = any(chan.connected() for chan in chans)
if not any_connected:
@ -661,7 +635,7 @@ def shield_sigint_handler(
return do_cancel()
# only set in the actor actually running the REPL
pdb_obj: MultiActorPdb | None = Lock.repl
pdb_obj = Lock.repl
# root actor branch that reports whether or not a child
# has locked debugger.
@ -719,7 +693,7 @@ def shield_sigint_handler(
)
return do_cancel()
task: str | None = Lock.local_task_in_debug
task = Lock.local_task_in_debug
if (
task
and pdb_obj
@ -734,8 +708,8 @@ def shield_sigint_handler(
# elif debug_mode():
else: # XXX: shouldn't ever get here?
raise RuntimeError("WTFWTFWTF")
# raise KeyboardInterrupt("WTFWTFWTF")
print("WTFWTFWTF")
raise KeyboardInterrupt
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
@ -763,18 +737,21 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX LEGACY: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496
def _set_trace(
actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None,
):
__tracebackhide__ = True
actor: tractor.Actor = actor or tractor.current_actor()
actor = actor or tractor.current_actor()
# start 2 levels up in user code
frame: FrameType | None = sys._getframe()
frame: Optional[FrameType] = sys._getframe()
if frame:
frame: FrameType = frame.f_back # type: ignore
frame = frame.f_back # type: ignore
if (
frame
@ -794,76 +771,12 @@ def _set_trace(
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame)
# undo_
# TODO: allow pausing from sync code, normally by remapping
# python's builtin breakpoint() hook to this runtime aware version.
def pause_from_sync() -> None:
print("ENTER SYNC PAUSE")
import greenback
__tracebackhide__ = True
actor: tractor.Actor = tractor.current_actor()
# task_can_release_tty_lock = trio.Event()
# spawn bg task which will lock out the TTY, we poll
# just below until the release event is reporting that task as
# waiting.. not the most ideal but works for now ;)
greenback.await_(
actor._service_n.start(partial(
_pause,
debug_func=None,
# release_lock_signal=task_can_release_tty_lock,
))
)
db, undo_sigint = mk_mpdb()
Lock.local_task_in_debug = 'sync'
# db.config.enable_hidden_frames = True
# we entered the global ``breakpoint()`` built-in from sync
# code?
frame: FrameType | None = sys._getframe()
# print(f'FRAME: {str(frame)}')
# assert not db._is_hidden(frame)
frame: FrameType = frame.f_back # type: ignore
# print(f'FRAME: {str(frame)}')
# if not db._is_hidden(frame):
# pdbp.set_trace()
# db._hidden_frames.append(
# (frame, frame.f_lineno)
# )
db.set_trace(frame=frame)
# NOTE XXX: see the `@pdbp.hideframe` decoration
# on `Lock.unshield_sigint()`.. I have NO CLUE why
# the next instruction's def frame is being shown
# in the tb but it seems to be something wonky with
# the way `pdb` core works?
# undo_sigint()
# Lock.global_actor_in_debug = actor.uid
# Lock.release()
# task_can_release_tty_lock.set()
# using the "pause" semantics instead since
# that better covers actually somewhat "pausing the runtime"
# for this particular paralell task to do debugging B)
pause = partial(
_pause,
breakpoint = partial(
_breakpoint,
_set_trace,
)
pp = pause # short-hand for "pause point"
async def breakpoint(**kwargs):
log.warning(
'`tractor.breakpoint()` is deprecated!\n'
'Please use `tractor.pause()` instead!\n'
)
await pause(**kwargs)
def _post_mortem(
@ -888,7 +801,7 @@ def _post_mortem(
post_mortem = partial(
_pause,
_breakpoint,
_post_mortem,
)
@ -970,7 +883,8 @@ async def maybe_wait_for_debugger(
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
sub_in_debug: tuple[str, str] | None = None
sub_in_debug = None
for _ in range(poll_steps):
@ -990,15 +904,13 @@ async def maybe_wait_for_debugger(
debug_complete = Lock.no_remote_has_tty
if (
debug_complete
and sub_in_debug is not None
and not debug_complete.is_set()
(debug_complete and
not debug_complete.is_set())
):
log.pdb(
log.debug(
'Root has errored but pdb is in use by '
f'child {sub_in_debug}\n'
'Waiting on tty lock to release..'
)
'Waiting on tty lock to release..')
await debug_complete.wait()

View File

@ -35,16 +35,14 @@ from ._state import current_actor, _runtime_vars
@acm
async def get_registrar(
async def get_arbiter(
host: str,
port: int,
) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
'''
Return a portal instance connected to a local or remote
'''Return a portal instance connected to a local or remote
arbiter.
'''
actor = current_actor()
@ -56,16 +54,13 @@ async def get_registrar(
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
else:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as arb_portal,
):
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
get_arbiter = get_registrar
@acm
async def get_root(
**kwargs,
@ -104,10 +99,7 @@ async def query_actor(
# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None
@ -118,7 +110,7 @@ async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Portal | None, None]:
) -> AsyncGenerator[Optional[Portal], None]:
'''
Ask the arbiter to find actor(s) by name.
@ -126,64 +118,28 @@ async def find_actor(
known to the arbiter.
'''
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
async with query_actor(
name=name,
)
# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
if sockaddr:
try:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
return
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await arb_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=sockaddr,
)
else:
yield None
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
# registry_addr: tuple[str, int] | None = None,
arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Portal, None]:
'''
Wait on an actor to register with the arbiter.
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
'''
"""
actor = current_actor()
async with get_arbiter(

View File

@ -132,7 +132,7 @@ def _trio_main(
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.cancel(f"Actor {actor.uid} received KBI")
log.warning(f"Actor {actor.uid} received KBI")
finally:
log.info(f"Actor {actor.uid} terminated")

View File

@ -18,18 +18,18 @@
Our classy exception set.
"""
import builtins
import importlib
from typing import (
Any,
Optional,
Type,
)
import importlib
import builtins
import traceback
import exceptiongroup as eg
import trio
from ._state import current_actor
_this_mod = importlib.import_module(__name__)
@ -44,7 +44,7 @@ class RemoteActorError(Exception):
def __init__(
self,
message: str,
suberror_type: Type[BaseException] | None = None,
suberror_type: Optional[Type[BaseException]] = None,
**msgdata
) -> None:
@ -53,36 +53,21 @@ class RemoteActorError(Exception):
self.type = suberror_type
self.msgdata = msgdata
@property
def src_actor_uid(self) -> tuple[str, str] | None:
return self.msgdata.get('src_actor_uid')
class InternalActorError(RemoteActorError):
'''
Remote internal ``tractor`` error indicating
"""Remote internal ``tractor`` error indicating
failure of some primitive or machinery.
'''
class ContextCancelled(RemoteActorError):
'''
Inter-actor task context was cancelled by either a call to
``Portal.cancel_actor()`` or ``Context.cancel()``.
'''
@property
def canceller(self) -> tuple[str, str] | None:
value = self.msgdata.get('canceller')
if value:
return tuple(value)
"""
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side."
class NoResult(RuntimeError):
"No final result is expected for this actor"
@ -121,16 +106,12 @@ def pack_error(
else:
tb_str = traceback.format_exc()
error_msg = {
return {
'error': {
'tb_str': tb_str,
'type_str': type(exc).__name__,
'src_actor_uid': current_actor().uid,
}
if isinstance(exc, ContextCancelled):
error_msg.update(exc.msgdata)
return {'error': error_msg}
}
def unpack_error(
@ -155,7 +136,7 @@ def unpack_error(
if type_name == 'ContextCancelled':
err_type = ContextCancelled
suberror_type = RemoteActorError
suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type
for ns in [

View File

@ -45,8 +45,10 @@ from ._exceptions import (
NoResult,
ContextCancelled,
)
from ._context import Context
from ._streaming import MsgStream
from ._streaming import (
Context,
MsgStream,
)
log = get_logger(__name__)
@ -101,7 +103,7 @@ class Portal:
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Context | None = None
self._expect_result: Optional[Context] = None
self._streams: set[MsgStream] = set()
self.actor = current_actor()
@ -207,10 +209,7 @@ class Portal:
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(
timeout
or self.cancel_timeout
) as cs:
with trio.move_on_after(timeout or self.cancel_timeout) as cs:
cs.shield = True
await self.run_from_ns('self', 'cancel')
@ -331,9 +330,7 @@ class Portal:
f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = NamespacePath.from_ref(
async_gen_func
).to_tuple()
async_gen_func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
@ -380,7 +377,6 @@ class Portal:
self,
func: Callable,
allow_overruns: bool = False,
**kwargs,
) -> AsyncGenerator[tuple[Context, Any], None]:
@ -400,26 +396,13 @@ class Portal:
raise TypeError(
f'{func} must be an async generator function!')
# TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new
# a new `_context.py` mod.
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs,
# NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does
# not open a stream until after some slow startup/init
# period, in which case when the first msg is read from
# the feeder mem chan, say when first calling
# `Context.open_stream(allow_overruns=True)`, the overrun condition will be
# raised before any ignoring of overflow msgs can take
# place..
allow_overruns=allow_overruns,
kwargs
)
assert ctx._remote_func_type == 'context'
@ -443,47 +426,29 @@ class Portal:
f' but received a non-error msg:\n{pformat(msg)}'
)
_err: BaseException | None = None
ctx._portal: Portal = self
_err: Optional[BaseException] = None
ctx._portal = self
uid: tuple = self.channel.uid
cid: str = ctx.cid
etype: Type[BaseException] | None = None
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in enter
# tuple.
# deliver context instance and .started() msg value in open tuple.
try:
async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope
async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
# do we need this?
# await trio.lowlevel.checkpoint()
yield ctx, first
# when in allow_ovveruns mode there may be lingering
# overflow sender tasks remaining?
if nurse.child_tasks:
# ensure we are in overrun state with
# ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery!
if (
not ctx._allow_overruns
or len(nurse.child_tasks) > 1
):
raise RuntimeError(
'Context has sub-tasks but is '
'not in `allow_overruns=True` Mode!?'
)
ctx._scope.cancel()
except ContextCancelled as err:
_err = err
# swallow and mask cross-actor task context cancels that
# were initiated by *this* side's task.
if not ctx._cancel_called:
# XXX: this should NEVER happen!
# from ._debug import breakpoint
# await breakpoint()
# context was cancelled at the far end but was
# not part of this end requesting that cancel
# so raise for the local task to respond and handle.
raise
# if the context was cancelled by client code
@ -503,17 +468,17 @@ class Portal:
) as err:
etype = type(err)
# the context cancels itself on any cancel
# causing error.
# cancel ourselves on any error.
if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
try:
await ctx.cancel()
except trio.BrokenResourceError:
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
@ -522,7 +487,12 @@ class Portal:
raise
else:
finally:
# in the case where a runtime nursery (due to internal bug)
# or a remote actor transmits an error we want to be
# sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the
# msg loop nursery right?
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
@ -535,7 +505,6 @@ class Portal:
f'value from callee `{result}`'
)
finally:
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last

View File

@ -89,7 +89,7 @@ async def open_root_actor(
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.pause_from_sync'
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
@ -235,10 +235,9 @@ async def open_root_actor(
BaseExceptionGroup,
) as err:
if (
not (await _debug._maybe_enter_pm(err))
and not is_multi_cancelled(err)
):
entered = await _debug._maybe_enter_pm(err)
if not entered and not is_multi_cancelled(err):
logger.exception("Root actor crashed:")
# always re-raise
@ -255,9 +254,7 @@ async def open_root_actor(
# tempn.start_soon(an.exited.wait)
logger.cancel("Shutting down root actor")
await actor.cancel(
requesting_uid=actor.uid,
)
await actor.cancel()
finally:
_state._current_actor = None

File diff suppressed because it is too large Load Diff

View File

@ -1,833 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
SC friendly shared memory management geared at real-time
processing.
Support for ``numpy`` compatible array-buffers is provided but is
considered optional within the context of this runtime-library.
"""
from __future__ import annotations
from sys import byteorder
import time
from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import Struct
import tractor
from .log import get_logger
_USE_POSIX = getattr(shm, '_USE_POSIX', False)
if _USE_POSIX:
from _posixshmem import shm_unlink
try:
import numpy as np
from numpy.lib import recfunctions as rfn
import nptyping
except ImportError:
pass
log = get_logger(__name__)
def disable_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
disable_mantracker()
class SharedInt:
'''
Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
'''
def __init__(
self,
shm: SharedMemory,
) -> None:
self._shm = shm
@property
def value(self) -> int:
return int.from_bytes(self._shm.buf, byteorder)
@value.setter
def value(self, value) -> None:
self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
name = self._shm.name
try:
shm_unlink(name)
except FileNotFoundError:
# might be a teardown race here?
log.warning(f'Shm for {name} already unlinked?')
class NDToken(Struct, frozen=True):
'''
Internal represenation of a shared memory ``numpy`` array "token"
which can be used to key and load a system (OS) wide shm entry
and correctly read the array by type signature.
This type is msg safe.
'''
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: tuple
size: int # in struct-array index / row terms
# TODO: use nptyping here on dtypes
@property
def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]:
return np.dtype(
list(
map(tuple, self.dtype_descr)
)
).descr
def as_msg(self):
return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> NDToken:
if isinstance(msg, NDToken):
return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return NDToken(**msg)
# _token_dec = msgspec.msgpack.Decoder(NDToken)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
# _known_tokens = trio.RunVar('shms', {})
# TODO: this should maybe be provided via
# a `.trionics.maybe_open_context()` wrapper factory?
# process-local store of keys to tokens
_known_tokens: dict[str, NDToken] = {}
def get_shm_token(key: str) -> NDToken | None:
'''
Convenience func to check if a token
for the provided key is known by this process.
Returns either the ``numpy`` token or a string for a shared list.
'''
return _known_tokens.get(key)
def _make_token(
key: str,
size: int,
dtype: np.dtype,
) -> NDToken:
'''
Create a serializable token that can be used
to access a shared array.
'''
return NDToken(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
dtype_descr=tuple(np.dtype(dtype).descr),
size=size,
)
class ShmArray:
'''
A shared memory ``numpy.ndarray`` API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
'''
def __init__(
self,
shmarr: np.ndarray,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
self._write_fields: list[str] | None = None
dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api?
@property
def _token(self) -> NDToken:
return NDToken(
shm_name=self._shm.name,
shm_first_index_name=self._first._shm.name,
shm_last_index_name=self._last._shm.name,
dtype_descr=tuple(self._array.dtype.descr),
size=self._len,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
'''
Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
# breakpoint()
return a
def ustruct(
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to
# in the returned view.
common_dtype: np.dtype = float,
) -> np.ndarray:
array = self._array
if fields:
selection = array[fields]
# fcount = len(fields)
else:
selection = array
# fcount = len(array.dtype.fields)
# XXX: manual ``.view()`` attempt that also doesn't work.
# uview = selection.view(
# dtype='<f16',
# ).reshape(-1, 4, order='A')
# assert len(selection) == len(uview)
u = rfn.structured_to_unstructured(
selection,
# dtype=float,
copy=True,
)
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
# array[:] = a[:]
return u
# return ShmArray(
# shmarr=u,
# first=self._first,
# last=self._last,
# shm=self._shm
# )
def last(
self,
length: int = 1,
) -> np.ndarray:
'''
Return the last ``length``'s worth of ("row") entries from the
array.
'''
return self.array[-length:]
def push(
self,
data: np.ndarray,
field_map: Optional[dict[str, str]] = None,
prepend: bool = False,
update_first: bool = True,
start: int | None = None,
) -> int:
'''
Ring buffer like "push" to append data
into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data)
if prepend:
index = (start or self._first.value) - length
if index < 0:
raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n'
f'You have passed {abs(index)} too many datums.'
)
else:
index = start if start is not None else self._last.value
end = index + length
if field_map:
src_names, dst_names = zip(*field_map.items())
else:
dst_names = src_names = self._write_fields
try:
self._array[
list(dst_names)
][index:end] = data[list(src_names)][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if (
prepend
and update_first
and length
):
assert index < self._first.value
if (
index < self._first.value
and update_first
):
assert prepend, 'prepend=True not passed but index decreased?'
self._first.value = index
elif not prepend:
self._last.value = end
self._post_init = True
return end
except ValueError as err:
if field_map:
raise
# should raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
# TODO: support "silent" prepends that don't update ._first.value?
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
def open_shm_ndarray(
size: int,
key: str | None = None,
dtype: np.dtype | None = None,
append_start_index: int | None = None,
readonly: bool = False,
) -> ShmArray:
'''
Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process).
'''
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
)
array = np.ndarray(
a.shape,
dtype=a.dtype,
buffer=shm.buf
)
array[:] = a[:]
array.setflags(write=int(not readonly))
token = _make_token(
key=key,
size=size,
dtype=dtype,
)
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
# Start the "real-time" append-updated (or "pushed-to") section
# after some start index: ``append_start_index``. This allows appending
# from a start point in the array which isn't the 0 index and looks
# something like,
# -------------------------
# | | i
# _________________________
# <-------------> <------->
# history real-time
#
# Once fully "prepended", the history section will leave the
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to nearly 2/3rds into the the length of
# the buffer leaving at least a "days worth of second samples"
# for the real-time section.
if append_start_index is None:
append_start_index = round(size * 0.616)
last.value = first.value = append_start_index
shmarr = ShmArray(
array,
first,
last,
shm,
)
assert shmarr._token == token
_known_tokens[key] = shmarr.token
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor().lifetime_stack
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
return shmarr
def attach_shm_ndarray(
token: tuple[str, str, tuple[str, str]],
readonly: bool = True,
) -> ShmArray:
'''
Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
'''
token = NDToken.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert NDToken.from_msg(_known_tokens[key]) == token, "WTF"
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
# actually place files in a subdir, see discussion here:
# https://stackoverflow.com/a/11103289
# attach to array buffer and view as per dtype
_err: Optional[Exception] = None
for _ in range(3):
try:
shm = SharedMemory(
name=key,
create=False,
)
break
except OSError as oserr:
_err = oserr
time.sleep(0.1)
else:
if _err:
raise _err
shmarr = np.ndarray(
(token.size,),
dtype=token.dtype,
buffer=shm.buf
)
shmarr.setflags(write=int(not readonly))
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
first.value
sha = ShmArray(
shmarr,
first,
last,
shm,
)
# read test
sha.array
# Stash key -> token knowledge for future queries
# via `maybe_opepn_shm_array()` but only after we know
# we can attach.
if key not in _known_tokens:
_known_tokens[key] = token
# "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(sha.close)
return sha
def maybe_open_shm_ndarray(
key: str, # unique identifier for segment
size: int,
dtype: np.dtype | None = None,
append_start_index: int = 0,
readonly: bool = True,
) -> tuple[ShmArray, bool]:
'''
Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registry
(presumes you don't have the block's explicit token).
This function is meant to solve the problem of discovering whether
a shared array token has been allocated or discovered by the actor
running in **this** process. Systems where multiple actors may seek
to access a common block can use this function to attempt to acquire
a token as discovered by the actors who have previously stored
a "key" -> ``NDToken`` map in an actor local (aka python global)
variable.
If you know the explicit ``NDToken`` for your memory segment instead
use ``attach_shm_array``.
'''
try:
# see if we already know this key
token = _known_tokens[key]
return (
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False, # not newly opened
)
except KeyError:
log.warning(f"Could not find {key} in shms cache")
if dtype:
token = _make_token(
key,
size=size,
dtype=dtype,
)
else:
try:
return (
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False,
)
except FileNotFoundError:
log.warning(f"Could not attach to shm with token {token}")
# This actor does not know about memory
# associated with the provided "key".
# Attempt to open a block and expect
# to fail if a block has been allocated
# on the OS by someone else.
return (
open_shm_ndarray(
key=key,
size=size,
dtype=dtype,
append_start_index=append_start_index,
readonly=readonly,
),
True,
)
class ShmList(ShareableList):
'''
Carbon copy of ``.shared_memory.ShareableList`` with a few
enhancements:
- readonly mode via instance var flag `._readonly: bool`
- ``.__getitem__()`` accepts ``slice`` inputs
- exposes the underlying buffer "name" as a ``.key: str``
'''
def __init__(
self,
sequence: list | None = None,
*,
name: str | None = None,
readonly: bool = True
) -> None:
self._readonly = readonly
self._key = name
return super().__init__(
sequence=sequence,
name=name,
)
@property
def key(self) -> str:
return self._key
@property
def readonly(self) -> bool:
return self._readonly
def __setitem__(
self,
position,
value,
) -> None:
# mimick ``numpy`` error
if self._readonly:
raise ValueError('assignment destination is read-only')
return super().__setitem__(position, value)
def __getitem__(
self,
indexish,
) -> list:
# NOTE: this is a non-writeable view (copy?) of the buffer
# in a new list instance.
if isinstance(indexish, slice):
return list(self)[indexish]
return super().__getitem__(indexish)
# TODO: should we offer a `.array` and `.push()` equivalent
# to the `ShmArray`?
# currently we have the following limitations:
# - can't write slices of input using traditional slice-assign
# syntax due to the ``ShareableList.__setitem__()`` implementation.
# - ``list(shmlist)`` returns a non-mutable copy instead of
# a writeable view which would be handier numpy-style ops.
def open_shm_list(
key: str,
sequence: list | None = None,
size: int = int(2 ** 10),
dtype: float | int | bool | str | bytes | None = float,
readonly: bool = True,
) -> ShmList:
if sequence is None:
default = {
float: 0.,
int: 0,
bool: True,
str: 'doggy',
None: None,
}[dtype]
sequence = [default] * size
shml = ShmList(
sequence=sequence,
name=key,
readonly=readonly,
)
# "close" attached shm on actor teardown
try:
actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close)
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps')
return shml
def attach_shm_list(
key: str,
readonly: bool = False,
) -> ShmList:
return ShmList(
name=key,
readonly=readonly,
)

View File

@ -19,7 +19,6 @@ Machinery for actor process spawning using multiple backends.
"""
from __future__ import annotations
import multiprocessing as mp
import sys
import platform
from typing import (
@ -54,6 +53,7 @@ from ._exceptions import ActorFailure
if TYPE_CHECKING:
from ._supervise import ActorNursery
import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor')
@ -70,6 +70,7 @@ _spawn_method: SpawnMethodKey = 'trio'
if platform.system() == 'Windows':
import multiprocessing as mp
_ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None:
@ -456,7 +457,7 @@ async def trio_proc(
# cancel result waiter that may have been spawned in
# tandem if not done already
log.cancel(
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()

View File

@ -23,6 +23,11 @@ from typing import (
Any,
)
import trio
from ._exceptions import NoRuntime
_current_actor: Optional['Actor'] = None # type: ignore # noqa
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
@ -32,11 +37,8 @@ _runtime_vars: dict[str, Any] = {
def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # noqa
'''
Get the process-local actor instance.
'''
from ._exceptions import NoRuntime
"""Get the process-local actor instance.
"""
if _current_actor is None and err_on_no_runtime:
raise NoRuntime("No local actor has been initialized yet")
@ -44,20 +46,16 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
def is_main_process() -> bool:
'''
Bool determining if this actor is running in the top-most process.
'''
"""Bool determining if this actor is running in the top-most process.
"""
import multiprocessing as mp
return mp.current_process().name == 'MainProcess'
def debug_mode() -> bool:
'''
Bool determining if "debug mode" is on which enables
"""Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
'''
"""
return bool(_runtime_vars['_debug_mode'])

View File

@ -14,36 +14,31 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
"""
Message stream types and APIs.
The machinery and types behind ``Context.open_stream()``
'''
"""
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import (
Any,
Optional,
Callable,
AsyncIterator,
TYPE_CHECKING,
AsyncGenerator,
AsyncIterator
)
import warnings
import trio
from ._exceptions import (
unpack_error,
)
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from .log import get_logger
from .trionics import (
broadcast_receiver,
BroadcastReceiver,
)
if TYPE_CHECKING:
from ._context import Context
from .trionics import broadcast_receiver, BroadcastReceiver
log = get_logger(__name__)
@ -75,9 +70,9 @@ class MsgStream(trio.abc.Channel):
'''
def __init__(
self,
ctx: Context, # typing: ignore # noqa
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel,
_broadcaster: BroadcastReceiver | None = None,
_broadcaster: Optional[BroadcastReceiver] = None,
) -> None:
self._ctx = ctx
@ -280,7 +275,7 @@ class MsgStream(trio.abc.Channel):
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).
@acm
@asynccontextmanager
async def subscribe(
self,
@ -340,8 +335,8 @@ class MsgStream(trio.abc.Channel):
Send a message over this stream to the far end.
'''
if self._ctx._remote_error:
raise self._ctx._remote_error # from None
if self._ctx._error:
raise self._ctx._error # from None
if self._closed:
raise trio.ClosedResourceError('This stream was already closed')
@ -349,11 +344,371 @@ class MsgStream(trio.abc.Channel):
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
def stream(func: Callable) -> Callable:
@dataclass
class Context:
'''
Mark an async function as a streaming routine with ``@stream``.
An inter-actor, ``trio`` task communication context.
NB: This class should never be instatiated directly, it is delivered
by either runtime machinery to a remotely started task or by entering
``Portal.open_context()``.
Allows maintaining task or protocol specific state between
2 communicating actor tasks. A unique context is created on the
callee side/end for every request to a remote actor from a portal.
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, open task oriented
message streams and acts as an IPC aware inter-actor-task cancel
scope.
'''
chan: Channel
cid: str
# these are the "feeder" channels for delivering
# message values to the local task from the runtime
# msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
_remote_func_type: Optional[str] = None
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
_result: Optional[Any] = False
_error: Optional[BaseException] = None
# status flags
_cancel_called: bool = False
_cancel_msg: Optional[str] = None
_enter_debugger_on_cancel: bool = True
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
# only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False
async def send_yield(self, data: Any) -> None:
warnings.warn(
"`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ",
DeprecationWarning,
stacklevel=2,
)
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
async def _maybe_raise_from_remote_msg(
self,
msg: dict[str, Any],
) -> None:
'''
(Maybe) unpack and raise a msg error into the local scope
nursery for this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
error = msg.get('error')
if error:
# If this is an error message from a context opened by
# ``Portal.open_context()`` we want to interrupt any ongoing
# (child) tasks within that context to be notified of the remote
# error relayed here.
#
# The reason we may want to raise the remote error immediately
# is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled) and
self._cancel_called
):
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
self._error = error
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
if self._scope_nursery:
async def raiser():
raise self._error from None
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
if not self._scope_nursery._closed: # type: ignore
self._scope_nursery.start_soon(raiser)
async def cancel(
self,
msg: Optional[str] = None,
) -> None:
'''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals...
'''
side = 'caller' if self._portal else 'callee'
if msg:
assert side == 'callee', 'Only callee side can provide cancel msg'
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
if side == 'caller':
if not self._portal:
raise RuntimeError(
"No portal found, this is likely a callee side context"
)
cid = self.cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.cancel(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.cancel(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
else:
log.cancel(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
else:
self._cancel_msg = msg
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
assert self._scope_nursery
self._scope_nursery.cancel_scope.cancel()
if self._recv_chan:
await self._recv_chan.aclose()
@asynccontextmanager
async def open_stream(
self,
backpressure: Optional[bool] = True,
msg_buffer_size: Optional[int] = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx = actor.get_context(
self.chan,
self.cid,
msg_buffer_size=msg_buffer_size,
)
ctx._backpressure = backpressure
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
async def result(self) -> Any:
'''
From a caller side, wait for and return the final result from
the callee side task.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
if self._result is False:
if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
break
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(
msg, self._portal.channel
) from msgerr
return self._result
async def started(
self,
value: Optional[Any] = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal:
raise RuntimeError(
f"Caller side context {self} can not call started!")
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
def stream(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@stream``.
"""
# annotate
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_stream_function = True # type: ignore
@ -379,3 +734,22 @@ def stream(func: Callable) -> Callable:
"(Or ``to_trio`` if using ``asyncio`` in guest mode)."
)
return func
def context(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@context``.
"""
# annotate
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func

View File

@ -37,7 +37,7 @@ import trio
import wrapt
from ..log import get_logger
from .._context import Context
from .._streaming import Context
__all__ = ['pub']
@ -148,8 +148,7 @@ def pub(
*,
tasks: set[str] = set(),
):
'''
Publisher async generator decorator.
"""Publisher async generator decorator.
A publisher can be called multiple times from different actors but
will only spawn a finite set of internal tasks to stream values to
@ -228,8 +227,7 @@ def pub(
running in a single actor to stream data to an arbitrary number of
subscribers. If you are ok to have a new task running for every call
to ``pub_service()`` then probably don't need this.
'''
"""
global _pubtask2lock
# handle the decorator not called with () case

View File

@ -82,10 +82,6 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str,
) -> None:
'''
IPC level msg-ing.
'''
return self.log(5, msg)
def runtime(
@ -98,20 +94,12 @@ class StackLevelAdapter(logging.LoggerAdapter):
self,
msg: str,
) -> None:
'''
Cancellation logging, mostly for runtime reporting.
'''
return self.log(16, msg)
def pdb(
self,
msg: str,
) -> None:
'''
Debugger logging.
'''
return self.log(500, msg)
def log(self, level, msg, *args, **kwargs):
@ -193,39 +181,15 @@ def get_logger(
'''
log = rlog = logging.getLogger(_root_name)
if (
name
and name != _proj_name
):
if name and name != _proj_name:
# NOTE: for handling for modules that use ``get_logger(__name__)``
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor _ipc.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
sub_name: None | str = None
rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.')
# NOTE: for tractor itself never include the last level
# module key in the name such that something like: eg.
# 'tractor.trionics._broadcast` only includes the first
# 2 tokens in the (coloured) name part.
if rname == 'tractor':
sub_name = pkgpath
if _root_name in sub_name:
duplicate, _, sub_name = sub_name.partition('.')
if not sub_name:
log = rlog
else:
log = rlog.getChild(sub_name)
# handling for modules that use ``get_logger(__name__)`` to
# avoid duplicate project-package token in msg output
rname, _, tail = name.partition('.')
if rname == _root_name:
name = tail
log = rlog.getChild(name)
log.level = rlog.level
# add our actor-task aware adapter which will dynamically look up
@ -278,7 +242,3 @@ def get_console_log(
def get_loglevel() -> str:
return _default_loglevel
# global module logger for tractor itself
log = get_logger('tractor')

View File

@ -43,62 +43,38 @@ Built-in messaging patterns, types, APIs and helpers.
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
from __future__ import annotations
from inspect import isfunction
from pkgutil import resolve_name
class NamespacePath(str):
'''
A serializeable description of a (function) Python object
location described by the target's module path and namespace
key meant as a message-native "packet" to allows actors to
point-and-load objects by an absolute ``str`` (and thus
serializable) reference.
A serializeable description of a (function) Python object location
described by the target's module path and namespace key meant as
a message-native "packet" to allows actors to point-and-load objects
by absolute reference.
'''
_ref: object | type | None = None
_ref: object = None
def load_ref(self) -> object | type:
def load_ref(self) -> object:
if self._ref is None:
self._ref = resolve_name(self)
return self._ref
@staticmethod
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
'''
Generate a minial ``str`` pair which describes a python
object's namespace path and object/type name.
def to_tuple(
self,
In more precise terms something like:
- 'py.namespace.path:object_name',
- eg.'tractor.msg:NamespacePath' will be the ``str`` form
of THIS type XD
'''
if (
isinstance(ref, object)
and not isfunction(ref)
):
name: str = type(ref).__name__
else:
name: str = getattr(ref, '__name__')
# fully qualified namespace path, tuple.
fqnp: tuple[str, str] = (
ref.__module__,
name,
)
return fqnp
) -> tuple[str, str]:
ref = self.load_ref()
return ref.__module__, getattr(ref, '__name__', '')
@classmethod
def from_ref(
cls,
ref: type | object,
ref,
) -> NamespacePath:
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
return cls(':'.join(fqnp))
def to_tuple(self) -> tuple[str, str]:
return self._mk_fqnp(self.load_ref())
return cls(':'.join(
(ref.__module__,
getattr(ref, '__name__', ''))
))

View File

@ -28,6 +28,7 @@ from typing import (
Callable,
AsyncIterator,
Awaitable,
Optional,
)
import trio
@ -64,9 +65,9 @@ class LinkedTaskChannel(trio.abc.Channel):
_trio_exited: bool = False
# set after ``asyncio.create_task()``
_aio_task: asyncio.Task | None = None
_aio_err: BaseException | None = None
_broadcaster: BroadcastReceiver | None = None
_aio_task: Optional[asyncio.Task] = None
_aio_err: Optional[BaseException] = None
_broadcaster: Optional[BroadcastReceiver] = None
async def aclose(self) -> None:
await self._from_aio.aclose()
@ -187,7 +188,7 @@ def _run_asyncio_task(
cancel_scope = trio.CancelScope()
aio_task_complete = trio.Event()
aio_err: BaseException | None = None
aio_err: Optional[BaseException] = None
chan = LinkedTaskChannel(
aio_q, # asyncio.Queue
@ -216,7 +217,7 @@ def _run_asyncio_task(
try:
result = await coro
except BaseException as aio_err:
# log.exception('asyncio task errored:')
log.exception('asyncio task errored')
chan._aio_err = aio_err
raise
@ -262,7 +263,7 @@ def _run_asyncio_task(
'''
nonlocal chan
aio_err = chan._aio_err
task_err: BaseException | None = None
task_err: Optional[BaseException] = None
# only to avoid ``asyncio`` complaining about uncaptured
# task exceptions
@ -300,7 +301,7 @@ def _run_asyncio_task(
elif task_err is None:
assert aio_err
aio_err.with_traceback(aio_err.__traceback__)
# log.error('infected task errorred')
log.error('infected task errorred')
# XXX: alway cancel the scope on error
# in case the trio task is blocking
@ -328,11 +329,11 @@ async def translate_aio_errors(
'''
trio_task = trio.lowlevel.current_task()
aio_err: BaseException | None = None
aio_err: Optional[BaseException] = None
# TODO: make thisi a channel method?
def maybe_raise_aio_err(
err: Exception | None = None
err: Optional[Exception] = None
) -> None:
aio_err = chan._aio_err
if (
@ -356,7 +357,7 @@ async def translate_aio_errors(
# relay cancel through to called ``asyncio`` task
assert chan._aio_task
chan._aio_task.cancel(
msg=f'`trio`-side caller task cancelled: {trio_task.name}'
msg=f'the `trio` caller task was cancelled: {trio_task.name}'
)
raise
@ -366,7 +367,7 @@ async def translate_aio_errors(
trio.ClosedResourceError,
# trio.BrokenResourceError,
):
aio_err: BaseException = chan._aio_err
aio_err = chan._aio_err
if (
task.cancelled() and
type(aio_err) is CancelledError

View File

@ -25,16 +25,8 @@ from collections import deque
from contextlib import asynccontextmanager
from functools import partial
from operator import ne
from typing import (
Optional,
Callable,
Awaitable,
Any,
AsyncIterator,
Protocol,
Generic,
TypeVar,
)
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
from typing import Generic, TypeVar
import trio
from trio._core._run import Task

View File

@ -237,7 +237,7 @@ async def maybe_open_context(
yielded = _Cache.values[ctx_key]
except KeyError:
log.debug(f'Allocating new {acm_func} for {ctx_key}')
log.info(f'Allocating new {acm_func} for {ctx_key}')
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
@ -265,7 +265,7 @@ async def maybe_open_context(
if yielded is not None:
# if no more consumers, teardown the client
if _Cache.users <= 0:
log.debug(f'De-allocating resource for {ctx_key}')
log.info(f'De-allocating resource for {ctx_key}')
# XXX: if we're cancelled we the entry may have never
# been entered since the nursery task was killed.