Merge pull request #206 from goodboy/stream_contexts

Explicit stream contexts
mp_teardown_hardening
goodboy 2021-05-04 10:41:03 -04:00 committed by GitHub
commit af93b8532a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 625 additions and 402 deletions

View File

@ -24,8 +24,9 @@ async def main():
# this async for loop streams values from the above # this async for loop streams values from the above
# async generator running in a separate process # async generator running in a separate process
async for letter in await portal.run(stream_forever): async with portal.open_stream_from(stream_forever) as stream:
print(letter) async for letter in stream:
print(letter)
# we support trio's cancellation system # we support trio's cancellation system
assert cancel_scope.cancelled_caught assert cancel_scope.cancelled_caught

View File

@ -26,8 +26,8 @@ async def main():
p1 = await n.start_actor('name_error', enable_modules=[__name__]) p1 = await n.start_actor('name_error', enable_modules=[__name__])
# retreive results # retreive results
stream = await p0.run(breakpoint_forever) async with p0.open_stream_from(breakpoint_forever) as stream:
await p1.run(name_error) await p1.run(name_error)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -21,4 +21,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) tractor.run(main, debug_mode=True, loglevel='debug')

View File

@ -21,7 +21,7 @@ async def aggregate(seed):
# fork point # fork point
portal = await nursery.start_actor( portal = await nursery.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
portals.append(portal) portals.append(portal)
@ -29,10 +29,13 @@ async def aggregate(seed):
send_chan, recv_chan = trio.open_memory_channel(500) send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_chan(portal, send_chan): async def push_to_chan(portal, send_chan):
# TODO: https://github.com/goodboy/tractor/issues/207
async with send_chan: async with send_chan:
async for value in await portal.run(stream_data, seed=seed): async with portal.open_stream_from(stream_data, seed=seed) as stream:
# leverage trio's built-in backpressure async for value in stream:
await send_chan.send(value) # leverage trio's built-in backpressure
await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}") print(f"FINISHED ITERATING {portal.channel.uid}")
@ -71,18 +74,24 @@ async def main():
import time import time
pre_start = time.time() pre_start = time.time()
portal = await nursery.run_in_actor( portal = await nursery.start_actor(
aggregate,
name='aggregator', name='aggregator',
seed=seed, enable_modules=[__name__],
) )
start = time.time() async with portal.open_stream_from(
# the portal call returns exactly what you'd expect aggregate,
# as if the remote "aggregate" function was called locally seed=seed,
result_stream = [] ) as stream:
async for value in await portal.result():
result_stream.append(value) start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in stream:
result_stream.append(value)
await portal.cancel_actor()
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")

View File

@ -15,11 +15,12 @@ async def stream_data(seed=10):
async def stream_from_portal(p, consumed): async def stream_from_portal(p, consumed):
async for item in await p.run(stream_data): async with p.open_stream_from(stream_data) as stream:
if item in consumed: async for item in stream:
consumed.remove(item) if item in consumed:
else: consumed.remove(item)
consumed.append(item) else:
consumed.append(item)
async def main(): async def main():

View File

@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err):
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portal = await nursery.run_in_actor(assert_err, name='errorer', **args) portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)
# get result(s) from main task # get result(s) from main task
try: try:
@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'donny', 'donny',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
# this async for loop streams values from the above # this async for loop streams values from the above
# async generator running in a separate process # async generator running in a separate process
async for letter in await portal.run(stream_forever): async with portal.open_stream_from(stream_forever) as stream:
print(letter) async for letter in stream:
print(letter)
# we support trio's cancellation system # we support trio's cancellation system
assert cancel_scope.cancelled_caught assert cancel_scope.cancelled_caught
@ -430,7 +433,6 @@ def test_cancel_via_SIGINT_other_task(
tractor.run(main) tractor.run(main)
async def spin_for(period=3): async def spin_for(period=3):
"Sync sleep." "Sync sleep."
time.sleep(period) time.sleep(period)
@ -438,7 +440,7 @@ async def spin_for(period=3):
async def spawn(): async def spawn():
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor( await tn.run_in_actor(
spin_for, spin_for,
name='sleeper', name='sleeper',
) )
@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor( await tn.run_in_actor(
spawn, spawn,
name='spawn', name='spawn',
) )

View File

@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0):
async def stream_from(portal): async def stream_from(portal):
async for value in await portal.result(): async with portal.open_stream_from(stream_forever) as stream:
print(value) async for value in stream:
print(value)
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -139,18 +140,20 @@ async def spawn_and_check_registry(
registry = await get_reg() registry = await get_reg()
assert actor.uid in registry assert actor.uid in registry
if with_streaming: try:
to_run = stream_forever async with tractor.open_nursery() as n:
else: async with trio.open_nursery() as trion:
to_run = trio.sleep_forever
async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
portals[name] = await n.run_in_actor(to_run, name=name) if with_streaming:
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up # wait on last actor to come up
async with tractor.wait_for_actor(name): async with tractor.wait_for_actor(name):
@ -171,19 +174,19 @@ async def spawn_and_check_registry(
trion.start_soon(cancel, use_signal, 1) trion.start_soon(cancel, use_signal, 1)
last_p = pts[-1] last_p = pts[-1]
async for value in await last_p.result(): await stream_from(last_p)
print(value)
else: else:
await cancel(use_signal) await cancel(use_signal)
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(0.5) await trio.sleep(0.5)
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert len(registry) == extra assert len(registry) == extra
assert actor.uid in registry assert actor.uid in registry
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@ -260,36 +263,38 @@ async def close_chans_before_nursery(
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor( portal1 = await tn.start_actor(
stream_forever, name='consumer1', enable_modules=[__name__])
name='consumer1', portal2 = await tn.start_actor(
) 'consumer2', enable_modules=[__name__])
agen1 = await portal1.result()
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) # TODO: compact this back as was in last commit once
agen2 = await portal2.run(stream_forever) # 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(stream_forever) as agen1:
async with portal2.open_stream_from(
stream_forever
) as agen2:
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
async with trio.open_nursery() as n: # XXX: THIS IS THE KEY THING that happens
n.start_soon(streamer, agen1) # **before** exiting the actor nursery block
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that happens # also kill off channels cuz why not
# **before** exiting the actor nursery block await agen1.aclose()
await agen2.aclose()
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(.5) await trio.sleep(1)
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()

View File

@ -60,45 +60,45 @@ async def subs(
def pred(i): def pred(i):
return isinstance(i, int) return isinstance(i, int)
# TODO: https://github.com/goodboy/tractor/issues/207
async with tractor.find_actor(pub_actor_name) as portal: async with tractor.find_actor(pub_actor_name) as portal:
stream = await portal.run( async with portal.open_stream_from(
pubber, pubber,
topics=which, topics=which,
seed=seed, seed=seed,
) ) as stream:
task_status.started(stream) task_status.started(stream)
times = 10 times = 10
count = 0 count = 0
await stream.__anext__() await stream.__anext__()
async for pkt in stream:
for topic, value in pkt.items():
assert pred(value)
count += 1
if count >= times:
break
await stream.aclose()
stream = await portal.run(
pubber,
topics=['odd'],
seed=seed,
)
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream: async for pkt in stream:
for topic, value in pkt.items(): for topic, value in pkt.items():
pass assert pred(value)
# assert pred(value)
count += 1 count += 1
if count >= times: if count >= times:
break break
finally:
await stream.aclose() await stream.aclose()
async with portal.open_stream_from(
pubber,
topics=['odd'],
seed=seed,
) as stream:
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream:
for topic, value in pkt.items():
pass
# assert pred(value)
count += 1
if count >= times:
break
finally:
await stream.aclose()
@tractor.msg.pub(tasks=['one', 'two']) @tractor.msg.pub(tasks=['one', 'two'])
async def multilock_pubber(get_topics): async def multilock_pubber(get_topics):
@ -128,11 +128,10 @@ async def test_required_args(callwith_expecterror):
await func(**kwargs) await func(**kwargs)
else: else:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# await func(**kwargs)
portal = await n.run_in_actor( portal = await n.start_actor(
multilock_pubber,
name='pubber', name='pubber',
**kwargs enable_modules=[__name__],
) )
async with tractor.wait_for_actor('pubber'): async with tractor.wait_for_actor('pubber'):
@ -140,8 +139,14 @@ async def test_required_args(callwith_expecterror):
await trio.sleep(0.5) await trio.sleep(0.5)
async for val in await portal.result(): async with portal.open_stream_from(
assert val == {'doggy': 10} multilock_pubber,
**kwargs
) as stream:
async for val in stream:
assert val == {'doggy': 10}
await portal.cancel_actor()
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@ -61,37 +61,38 @@ async def stream_from_single_subactor(stream_func):
# no brokerd actor found # no brokerd actor found
portal = await nursery.start_actor( portal = await nursery.start_actor(
'streamerd', 'streamerd',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
seq = range(10) seq = range(10)
stream = await portal.run( async with portal.open_stream_from(
stream_func, # one of the funcs above stream_func,
sequence=list(seq), # has to be msgpack serializable sequence=list(seq), # has to be msgpack serializable
) ) as stream:
# it'd sure be nice to have an asyncitertools here...
iseq = iter(seq)
ival = next(iseq)
async for val in stream: # it'd sure be nice to have an asyncitertools here...
assert val == ival iseq = iter(seq)
ival = next(iseq)
async for val in stream:
assert val == ival
try:
ival = next(iseq)
except StopIteration:
# should cancel far end task which will be
# caught and no error is raised
await stream.aclose()
await trio.sleep(0.3)
try: try:
ival = next(iseq) await stream.__anext__()
except StopIteration: except StopAsyncIteration:
# should cancel far end task which will be # stop all spawned subactors
# caught and no error is raised await portal.cancel_actor()
await stream.aclose() # await nursery.cancel()
await trio.sleep(0.3)
try:
await stream.__anext__()
except StopAsyncIteration:
# stop all spawned subactors
await portal.cancel_actor()
# await nursery.cancel()
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -132,7 +133,7 @@ async def aggregate(seed):
# fork point # fork point
portal = await nursery.start_actor( portal = await nursery.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
portals.append(portal) portals.append(portal)
@ -141,11 +142,14 @@ async def aggregate(seed):
async def push_to_chan(portal, send_chan): async def push_to_chan(portal, send_chan):
async with send_chan: async with send_chan:
async for value in await portal.run(
__name__, 'stream_data', seed=seed async with portal.open_stream_from(
): stream_data, seed=seed,
# leverage trio's built-in backpressure ) as stream:
await send_chan.send(value)
async for value in stream:
# leverage trio's built-in backpressure
await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}") print(f"FINISHED ITERATING {portal.channel.uid}")
@ -183,22 +187,24 @@ async def a_quadruple_example():
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal = await nursery.run_in_actor( portal = await nursery.start_actor(
aggregate,
seed=seed,
name='aggregator', name='aggregator',
enable_modules=[__name__],
) )
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream = [] result_stream = []
async for value in await portal.result():
result_stream.append(value) async with portal.open_stream_from(aggregate, seed=seed) as stream:
async for value in stream:
result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed)) assert result_stream == list(range(seed))
await portal.cancel_actor()
return result_stream return result_stream
@ -272,48 +278,55 @@ async def test_respawn_consumer_task(
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
stream = await(await n.run_in_actor( portal = await n.start_actor(
name='streamer',
enable_modules=[__name__]
)
async with portal.open_stream_from(
stream_data, stream_data,
seed=11, seed=11,
name='streamer', ) as stream:
)).result()
expect = set(range(11)) expect = set(range(11))
received = [] received = []
# this is the re-spawn task routine # this is the re-spawn task routine
async def consume(task_status=trio.TASK_STATUS_IGNORED): async def consume(task_status=trio.TASK_STATUS_IGNORED):
print('starting consume task..') print('starting consume task..')
nonlocal stream nonlocal stream
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
task_status.started(cs) task_status.started(cs)
# shield stream's underlying channel from cancellation # shield stream's underlying channel from cancellation
with stream.shield(): with stream.shield():
async for v in stream: async for v in stream:
print(f'from stream: {v}') print(f'from stream: {v}')
expect.remove(v) expect.remove(v)
received.append(v) received.append(v)
print('exited consume') print('exited consume')
async with trio.open_nursery() as ln: async with trio.open_nursery() as ln:
cs = await ln.start(consume) cs = await ln.start(consume)
while True: while True:
await trio.sleep(0.1) await trio.sleep(0.1)
if received[-1] % 2 == 0: if received[-1] % 2 == 0:
print('cancelling consume task..') print('cancelling consume task..')
cs.cancel() cs.cancel()
# respawn # respawn
cs = await ln.start(consume) cs = await ln.start(consume)
if not expect: if not expect:
print("all values streamed, BREAKING") print("all values streamed, BREAKING")
break break
# TODO: this is justification for a
# ``ActorNursery.stream_from_actor()`` helper?
await portal.cancel_actor()

View File

@ -1093,4 +1093,4 @@ class Arbiter(Actor):
event.set() event.set()
async def unregister_actor(self, uid: Tuple[str, str]) -> None: async def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid, None) self._registry.pop(uid)

View File

@ -182,24 +182,22 @@ def _breakpoint(debug_func) -> Awaitable[None]:
_debugger_request_cs = cs _debugger_request_cs = cs
try: try:
async with get_root() as portal: async with get_root() as portal:
with trio.fail_after(.5): async with portal.open_stream_from(
stream = await portal.run(
tractor._debug._hijack_stdin_relay_to_child, tractor._debug._hijack_stdin_relay_to_child,
subactor_uid=actor.uid, subactor_uid=actor.uid,
) ) as stream:
async with aclosing(stream):
# block until first yield above # block until first yield above
async for val in stream: async for val in stream:
assert val == 'Locked' assert val == 'Locked'
task_status.started() task_status.started()
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
await do_unlock.wait() await do_unlock.wait()
# trigger cancellation of remote stream # trigger cancellation of remote stream
break break
finally: finally:
log.debug(f"Exiting debugger for actor {actor}") log.debug(f"Exiting debugger for actor {actor}")
global _in_debug global _in_debug

View File

@ -46,6 +46,10 @@ class ModuleNotExposed(ModuleNotFoundError):
"The requested module is not exposed for RPC" "The requested module is not exposed for RPC"
class NoRuntime(RuntimeError):
"The root actor has not been initialized yet"
def pack_error(exc: BaseException) -> Dict[str, Any]: def pack_error(exc: BaseException) -> Dict[str, Any]:
"""Create an "error message" for tranmission over """Create an "error message" for tranmission over
a channel (aka the wire). a channel (aka the wire).

View File

@ -4,7 +4,6 @@ Inter-process comms abstractions
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
from functools import partial from functools import partial
import inspect
import msgpack import msgpack
import trio import trio

View File

@ -3,11 +3,12 @@ Portal api
""" """
import importlib import importlib
import inspect import inspect
import typing from typing import (
from typing import Tuple, Any, Dict, Optional, Set, Iterator Tuple, Any, Dict, Optional, Set,
Callable, AsyncGenerator
)
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
from contextlib import contextmanager
import warnings import warnings
import trio import trio
@ -17,16 +18,17 @@ from ._state import current_actor
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from ._exceptions import unpack_error, NoResult, RemoteActorError from ._exceptions import unpack_error, NoResult, RemoteActorError
from ._streaming import Context, ReceiveMsgStream
log = get_logger('tractor') log = get_logger(__name__)
@asynccontextmanager @asynccontextmanager
async def maybe_open_nursery( async def maybe_open_nursery(
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
shield: bool = False, shield: bool = False,
) -> typing.AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided. """Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided. Blocks on exit as expected if no input nursery is provided.
@ -39,113 +41,30 @@ async def maybe_open_nursery(
yield nursery yield nursery
class ReceiveStream(trio.abc.ReceiveChannel): def func_deats(func: Callable) -> Tuple[str, str]:
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with return (
special behaviour for signalling stream termination across an func.__module__,
inter-actor ``Channel``. This is the type returned to a local task func.__name__,
which invoked a remote streaming function using `Portal.run()`. )
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
"""
def __init__(
self,
cid: str,
rx_chan: trio.abc.ReceiveChannel,
portal: 'Portal',
) -> None:
self._cid = cid
self._rx_chan = rx_chan
self._portal = portal
self._shielded = False
# delegate directly to underlying mem channel
def receive_nowait(self):
return self._rx_chan.receive_nowait()
async def receive(self):
try:
msg = await self._rx_chan.receive()
return msg['yield']
except trio.ClosedResourceError:
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise StopAsyncIteration
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
except KeyError:
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self._portal.channel)
@contextmanager
def shield(
self
) -> Iterator['ReceiveStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
if self._rx_chan._closed:
log.warning(f"{self} is already closed")
return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
cid = self._cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.warning(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
if not self._portal.channel.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
with trio.CancelScope(shield=True):
await self._rx_chan.aclose()
def clone(self):
return self
class Portal: class Portal:
"""A 'portal' to a(n) (remote) ``Actor``. """A 'portal' to a(n) (remote) ``Actor``.
Allows for invoking remote routines and receiving results through an A portal is "opened" (and eventually closed) by one side of an
underlying ``tractor.Channel`` as though the remote (async) inter-actor communication context. The side which opens the portal
function / generator was invoked locally. is equivalent to a "caller" in function parlance and usually is
either the called actor's parent (in process tree hierarchy terms)
or a client interested in scheduling work to be done remotely in a
far process.
The portal api allows the "caller" actor to invoke remote routines
and receive results through an underlying ``tractor.Channel`` as
though the remote (async) function / generator was called locally.
It may be thought of loosely as an RPC api where native Python
function calling semantics are supported transparently; hence it is
like having a "portal" between the seperate actor memory spaces.
Think of this like a native async IPC API.
""" """
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
@ -157,7 +76,7 @@ class Portal:
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
] = None ] = None
self._streams: Set[ReceiveStream] = set() self._streams: Set[ReceiveMsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
async def _submit( async def _submit(
@ -182,102 +101,37 @@ class Portal:
first_msg = await recv_chan.receive() first_msg = await recv_chan.receive()
functype = first_msg.get('functype') functype = first_msg.get('functype')
if functype == 'asyncfunc': if 'error' in first_msg:
resp_type = 'return'
elif functype == 'asyncgen':
resp_type = 'yield'
elif 'error' in first_msg:
raise unpack_error(first_msg, self.channel) raise unpack_error(first_msg, self.channel)
else:
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
raise ValueError(f"{first_msg} is an invalid response packet?") raise ValueError(f"{first_msg} is an invalid response packet?")
return cid, recv_chan, resp_type, first_msg return cid, recv_chan, functype, first_msg
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
assert self._expect_result is None, \ assert self._expect_result is None, \
"A pending main result has already been submitted" "A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, kwargs) self._expect_result = await self._submit(ns, func, kwargs)
async def run( async def _return_once(
self,
func_or_ns: str,
fn_name: Optional[str] = None,
**kwargs
) -> Any:
"""Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
"""
if isinstance(func_or_ns, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
"deprecated, pass a function reference directly instead\n"
"If you still want to run a remote function by name use"
"`Portal.run_from_ns()`",
DeprecationWarning,
stacklevel=2,
)
fn_mod_path = func_or_ns
assert isinstance(fn_name, str)
else: # function reference was passed directly
fn = func_or_ns
fn_mod_path = fn.__module__
fn_name = fn.__name__
return await self._return_from_resptype(
*(await self._submit(fn_mod_path, fn_name, kwargs))
)
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only
be used for `tractor` internals.
"""
return await self._return_from_resptype(
*(await self._submit(namespace_path, function_name, kwargs))
)
async def _return_from_resptype(
self, self,
cid: str, cid: str,
recv_chan: trio.abc.ReceiveChannel, recv_chan: trio.abc.ReceiveChannel,
resptype: str, resptype: str,
first_msg: dict first_msg: dict
) -> Any: ) -> Any:
# TODO: not this needs some serious work and thinking about how assert resptype == 'asyncfunc' # single response
# to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff)
if resptype == 'yield': # stream response
rchan = ReceiveStream(cid, recv_chan, self)
self._streams.add(rchan)
return rchan
elif resptype == 'return': # single response msg = await recv_chan.receive()
msg = await recv_chan.receive() try:
try: return msg['return']
return msg['return'] except KeyError:
except KeyError: # internal error should never get here
# internal error should never get here assert msg.get('cid'), "Received internal error at portal?"
assert msg.get('cid'), "Received internal error at portal?" raise unpack_error(msg, self.channel)
raise unpack_error(msg, self.channel)
else:
raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self) -> Any: async def result(self) -> Any:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
@ -300,9 +154,7 @@ class Portal:
assert self._expect_result assert self._expect_result
if self._result is None: if self._result is None:
try: try:
self._result = await self._return_from_resptype( self._result = await self._return_once(*self._expect_result)
*self._expect_result
)
except RemoteActorError as err: except RemoteActorError as err:
self._result = err self._result = err
@ -369,6 +221,122 @@ class Portal:
f"{self.channel} for {self.channel.uid} was already closed?") f"{self.channel} for {self.channel.uid} was already closed?")
return False return False
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only
be used for `tractor` internals.
"""
return await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs))
)
async def run(
self,
func: str,
fn_name: Optional[str] = None,
**kwargs
) -> Any:
"""Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
"""
if isinstance(func, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
"deprecated, pass a function reference directly instead\n"
"If you still want to run a remote function by name use"
"`Portal.run_from_ns()`",
DeprecationWarning,
stacklevel=2,
)
fn_mod_path = func
assert isinstance(fn_name, str)
else: # function reference was passed directly
if (
not inspect.iscoroutinefunction(func) or
(
inspect.iscoroutinefunction(func) and
getattr(func, '_tractor_stream_function', False)
)
):
raise TypeError(
f'{func} must be a non-streaming async function!')
fn_mod_path, fn_name = func_deats(func)
return await self._return_once(
*(await self._submit(fn_mod_path, fn_name, kwargs))
)
@asynccontextmanager
async def open_stream_from(
self,
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
if not (
inspect.iscoroutinefunction(async_gen_func) and
getattr(async_gen_func, '_tractor_stream_function', False)
):
raise TypeError(
f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(async_gen_func)
(
cid,
recv_chan,
functype,
first_msg
) = await self._submit(fn_mod_path, fn_name, kwargs)
# receive only stream
assert functype == 'asyncgen'
ctx = Context(self.channel, cid, _portal=self)
try:
async with ReceiveMsgStream(ctx, recv_chan, self) as rchan:
self._streams.add(rchan)
yield rchan
finally:
# cancel the far end task on consumer close
try:
await ctx.cancel()
except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
log.debug(f'Context {ctx} was already closed?')
self._streams.remove(rchan)
# @asynccontextmanager
# async def open_context(
# self,
# func: Callable,
# **kwargs,
# ) -> Context:
# # TODO
# elif resptype == 'context': # context manager style setup/teardown
# # TODO likely not here though
# raise NotImplementedError
@dataclass @dataclass
class LocalPortal: class LocalPortal:
@ -387,10 +355,7 @@ class LocalPortal:
""" """
obj = self.actor if ns == 'self' else importlib.import_module(ns) obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func_name) func = getattr(obj, func_name)
if inspect.iscoroutinefunction(func): return await func(**kwargs)
return await func(**kwargs)
else:
return func(**kwargs)
@asynccontextmanager @asynccontextmanager
@ -399,7 +364,7 @@ async def open_portal(
nursery: Optional[trio.Nursery] = None, nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True, start_msg_loop: bool = True,
shield: bool = False, shield: bool = False,
) -> typing.AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing. Spawns a background task to handle message processing.

View File

@ -98,17 +98,11 @@ async def exhaust_portal(
""" """
try: try:
log.debug(f"Waiting on final result from {actor.uid}") log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it # XXX: streams should never be reaped here since they should
if inspect.isasyncgen(res): # always be established and shutdown using a context manager api
final = [] final = await portal.result()
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError`` # we reraise in the parent task via a ``trio.MultiError``
return err return err

View File

@ -7,6 +7,9 @@ import multiprocessing as mp
import trio import trio
from ._exceptions import NoRuntime
_current_actor: Optional['Actor'] = None # type: ignore # noqa _current_actor: Optional['Actor'] = None # type: ignore # noqa
_runtime_vars: Dict[str, Any] = { _runtime_vars: Dict[str, Any] = {
'_debug_mode': False, '_debug_mode': False,
@ -19,7 +22,7 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
"""Get the process-local actor instance. """Get the process-local actor instance.
""" """
if _current_actor is None and err_on_no_runtime: if _current_actor is None and err_on_no_runtime:
raise RuntimeError("No local actor has been initialized yet") raise NoRuntime("No local actor has been initialized yet")
return _current_actor return _current_actor

View File

@ -1,14 +1,17 @@
import inspect import inspect
from contextvars import ContextVar from contextlib import contextmanager # , asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any, Iterator, Optional
import warnings
import trio import trio
from ._ipc import Channel from ._ipc import Channel
from ._exceptions import unpack_error
from .log import get_logger
_context: ContextVar['Context'] = ContextVar('context') log = get_logger(__name__)
@dataclass(frozen=True) @dataclass(frozen=True)
@ -18,22 +21,76 @@ class Context:
Allows maintaining task or protocol specific state between communicating Allows maintaining task or protocol specific state between communicating
actors. A unique context is created on the receiving end for every request actors. A unique context is created on the receiving end for every request
to a remote actor. to a remote actor.
A context can be cancelled and (eventually) restarted from
either side of the underlying IPC channel.
A context can be used to open task oriented message streams.
""" """
chan: Channel chan: Channel
cid: str cid: str
cancel_scope: trio.CancelScope
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
# only set on the callee side
_cancel_scope: Optional[trio.CancelScope] = None
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
warnings.warn(
"`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ",
DeprecationWarning,
stacklevel=2,
)
await self.chan.send({'yield': data, 'cid': self.cid}) await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
async def cancel(self) -> None:
"""Cancel this inter-actor-task context.
def current_context(): Request that the far side cancel it's current linked context,
"""Get the current task's context instance. timeout quickly to sidestep 2-generals...
"""
return _context.get() """
assert self._portal, (
"No portal found, this is likely a callee side context")
cid = self.cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.warning(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
if not self._portal.channel.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
# async def restart(self) -> None:
# # TODO
# pass
# @asynccontextmanager
# async def open_stream(
# self,
# ) -> AsyncContextManager:
# # TODO
# pass
def stream(func): def stream(func):
@ -41,9 +98,163 @@ def stream(func):
""" """
func._tractor_stream_function = True func._tractor_stream_function = True
sig = inspect.signature(func) sig = inspect.signature(func)
if 'ctx' not in sig.parameters: params = sig.parameters
if 'stream' not in params and 'ctx' in params:
warnings.warn(
"`@tractor.stream decorated funcs should now declare a `stream` "
" arg, `ctx` is now designated for use with @tractor.context",
DeprecationWarning,
stacklevel=2,
)
if (
'ctx' not in params and
'to_trio' not in params and
'stream' not in params
):
raise TypeError( raise TypeError(
"The first argument to the stream function " "The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`" f"{func.__name__} must be `ctx: tractor.Context`"
) )
return func return func
class ReceiveMsgStream(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
"""
def __init__(
self,
ctx: Context,
rx_chan: trio.abc.ReceiveChannel,
portal: 'Portal', # type: ignore # noqa
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._portal = portal
self._shielded = False
# delegate directly to underlying mem channel
def receive_nowait(self):
return self._rx_chan.receive_nowait()
async def receive(self):
try:
msg = await self._rx_chan.receive()
return msg['yield']
except KeyError:
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10 match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self._portal.channel)
except (trio.ClosedResourceError, StopAsyncIteration):
# XXX: this indicates that a `stop` message was
# sent by the far side of the underlying channel.
# Currently this is triggered by calling ``.aclose()`` on
# the send side of the channel inside
# ``Actor._push_result()``, but maybe it should be put here?
# to avoid exposing the internal mem chan closing mechanism?
# in theory we could instead do some flushing of the channel
# if needed to ensure all consumers are complete before
# triggering closure too early?
# Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically.
# We **don't** want to be closing this send channel and not
# relaying a final value to remaining consumers who may not
# have been scheduled to receive it yet?
# lots of testing to do here
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise StopAsyncIteration
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
"""
self._shielded = True
yield self
self._shielded = False
async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
"""
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
return
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return
if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return
# close the local mem chan
rx_chan.close()
# cancel surrounding IPC context
await self._ctx.cancel()
# TODO: but make it broadcasting to consumers
# def clone(self):
# """Clone this receive channel allowing for multi-task
# consumption from the same channel.
# """
# return ReceiveStream(
# self._cid,
# self._rx_chan.clone(),
# self._portal,
# )
# class MsgStream(ReceiveMsgStream, trio.abc.Channel):
# """
# Bidirectional message stream for use within an inter-actor actor
# ``Context```.
# """
# async def send(
# self,
# data: Any
# ) -> None:
# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})

View File

@ -2,6 +2,7 @@
``trio`` inspired apis and helpers ``trio`` inspired apis and helpers
""" """
from functools import partial from functools import partial
import inspect
import multiprocessing as mp import multiprocessing as mp
from typing import Tuple, List, Dict, Optional from typing import Tuple, List, Dict, Optional
import typing import typing
@ -136,6 +137,14 @@ class ActorNursery:
# use the run_in_actor nursery # use the run_in_actor nursery
nursery=self._ria_nursery, nursery=self._ria_nursery,
) )
# XXX: don't allow stream funcs
if not (
inspect.iscoroutinefunction(fn) and
not getattr(fn, '_tractor_stream_function', False)
):
raise TypeError(f'{fn} must be an async function!')
# this marks the actor to be cancelled after its portal result # this marks the actor to be cancelled after its portal result
# is retreived, see logic in `open_nursery()` below. # is retreived, see logic in `open_nursery()` below.
self._cancel_after_result_on_exit.add(portal) self._cancel_after_result_on_exit.add(portal)

View File

@ -29,9 +29,13 @@ async def fan_out_to_ctxs(
return tuple(topics2ctxs.keys()) return tuple(topics2ctxs.keys())
agen = pub_async_gen_func(get_topics=get_topics) agen = pub_async_gen_func(get_topics=get_topics)
async with aclosing(agen) as pub_gen: async with aclosing(agen) as pub_gen:
async for published in pub_gen: async for published in pub_gen:
ctx_payloads: Dict[str, Any] = {} ctx_payloads: Dict[str, Any] = {}
for topic, data in published.items(): for topic, data in published.items():
log.debug(f"publishing {topic, data}") log.debug(f"publishing {topic, data}")
# build a new dict packet or invoke provided packetizer # build a new dict packet or invoke provided packetizer