forked from goodboy/tractor
Consider relaying context error via raised-in-scope-nursery task
parent
2f804a977c
commit
378c8cee52
|
@ -177,6 +177,7 @@ class Portal:
|
||||||
f"Cancelling all streams with {self.channel.uid}")
|
f"Cancelling all streams with {self.channel.uid}")
|
||||||
for stream in self._streams.copy():
|
for stream in self._streams.copy():
|
||||||
try:
|
try:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
await stream.aclose()
|
await stream.aclose()
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
# don't error the stream having already been closed
|
# don't error the stream having already been closed
|
||||||
|
@ -369,7 +370,6 @@ class Portal:
|
||||||
|
|
||||||
recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
||||||
|
|
||||||
try:
|
|
||||||
cid, recv_chan, functype, first_msg = await self._submit(
|
cid, recv_chan, functype, first_msg = await self._submit(
|
||||||
fn_mod_path, fn_name, kwargs)
|
fn_mod_path, fn_name, kwargs)
|
||||||
|
|
||||||
|
@ -392,16 +392,21 @@ class Portal:
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in open
|
# deliver context instance and .started() msg value in open
|
||||||
# tuple.
|
# tuple.
|
||||||
|
try:
|
||||||
|
async with trio.open_nursery() as scope_nursery:
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
self.channel,
|
self.channel,
|
||||||
cid,
|
cid,
|
||||||
_portal=self,
|
_portal=self,
|
||||||
_recv_chan=recv_chan,
|
_recv_chan=recv_chan,
|
||||||
|
_scope_nursery=scope_nursery,
|
||||||
)
|
)
|
||||||
|
recv_chan._ctx = ctx
|
||||||
|
|
||||||
try:
|
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
|
log.info(f'Context for {func.__name__} completed')
|
||||||
|
|
||||||
if cancel_on_exit:
|
if cancel_on_exit:
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
|
@ -409,24 +414,34 @@ class Portal:
|
||||||
if not ctx._cancel_called:
|
if not ctx._cancel_called:
|
||||||
await ctx.result()
|
await ctx.result()
|
||||||
|
|
||||||
|
await recv_chan.aclose()
|
||||||
|
|
||||||
|
# except TypeError:
|
||||||
|
# # if fn_name == '_emsd_main':
|
||||||
|
# import tractor
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
|
||||||
except ContextCancelled:
|
except ContextCancelled:
|
||||||
# if the context was cancelled by client code
|
|
||||||
# then we don't need to raise since user code
|
|
||||||
# is expecting this.
|
|
||||||
if not ctx._cancel_called:
|
if not ctx._cancel_called:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except BaseException:
|
# if the context was cancelled by client code
|
||||||
|
# then we don't need to raise since user code
|
||||||
|
# is expecting this and the block should exit.
|
||||||
|
else:
|
||||||
|
log.debug(f'Context {ctx} cancelled gracefully')
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
# the context cancels itself on any deviation
|
# the context cancels itself on any deviation
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
# finally:
|
||||||
log.info(f'Context for {func.__name__} completed')
|
# log.info(f'Context for {func.__name__} completed')
|
||||||
|
|
||||||
|
# finally:
|
||||||
|
# if recv_chan is not None:
|
||||||
|
|
||||||
finally:
|
|
||||||
if recv_chan is not None:
|
|
||||||
await recv_chan.aclose()
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class LocalPortal:
|
class LocalPortal:
|
||||||
|
@ -464,6 +479,7 @@ async def open_portal(
|
||||||
was_connected = False
|
was_connected = False
|
||||||
|
|
||||||
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
||||||
|
|
||||||
if not channel.connected():
|
if not channel.connected():
|
||||||
await channel.connect()
|
await channel.connect()
|
||||||
was_connected = True
|
was_connected = True
|
||||||
|
@ -485,11 +501,12 @@ async def open_portal(
|
||||||
portal = Portal(channel)
|
portal = Portal(channel)
|
||||||
try:
|
try:
|
||||||
yield portal
|
yield portal
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
await portal.aclose()
|
await portal.aclose()
|
||||||
|
|
||||||
if was_connected:
|
if was_connected:
|
||||||
# cancel remote channel-msg loop
|
# gracefully signal remote channel-msg loop
|
||||||
await channel.send(None)
|
await channel.send(None)
|
||||||
|
|
||||||
# cancel background msg loop task
|
# cancel background msg loop task
|
||||||
|
|
|
@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import (
|
from typing import (
|
||||||
Any, Iterator, Optional, Callable,
|
Any, Iterator, Optional, Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator, Dict,
|
||||||
)
|
)
|
||||||
|
|
||||||
import warnings
|
import warnings
|
||||||
|
@ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# if self._ctx.chan.uid[0] == 'brokerd.ib':
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
msg = await self._rx_chan.receive()
|
msg = await self._rx_chan.receive()
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
|
@ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
raise # propagate
|
raise # propagate
|
||||||
|
|
||||||
|
# except trio.Cancelled:
|
||||||
|
# if not self._shielded:
|
||||||
|
# # if shielded we don't propagate a cancelled
|
||||||
|
# raise
|
||||||
|
|
||||||
# except trio.Cancelled:
|
# except trio.Cancelled:
|
||||||
# # relay cancels to the remote task
|
# # relay cancels to the remote task
|
||||||
# await self.aclose()
|
# await self.aclose()
|
||||||
|
@ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||||
rx_chan = self._rx_chan
|
rx_chan = self._rx_chan
|
||||||
|
|
||||||
if rx_chan._closed:
|
if rx_chan._closed: # or self._eoc:
|
||||||
log.warning(f"{self} is already closed")
|
log.warning(f"{self} is already closed")
|
||||||
|
|
||||||
# this stream has already been closed so silently succeed as
|
# this stream has already been closed so silently succeed as
|
||||||
|
@ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# stop from the caller side
|
# stop from the caller side
|
||||||
await self._ctx.send_stop()
|
await self._ctx.send_stop()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except (
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError
|
||||||
|
):
|
||||||
# the underlying channel may already have been pulled
|
# the underlying channel may already have been pulled
|
||||||
# in which case our stop message is meaningless since
|
# in which case our stop message is meaningless since
|
||||||
# it can't traverse the transport.
|
# it can't traverse the transport.
|
||||||
|
@ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# still need to consume msgs that are "in transit" from the far
|
# still need to consume msgs that are "in transit" from the far
|
||||||
# end (eg. for ``Context.result()``).
|
# end (eg. for ``Context.result()``).
|
||||||
|
|
||||||
# TODO: but make it broadcasting to consumers
|
|
||||||
# def clone(self):
|
|
||||||
# """Clone this receive channel allowing for multi-task
|
|
||||||
# consumption from the same channel.
|
|
||||||
|
|
||||||
# """
|
|
||||||
# return ReceiveStream(
|
|
||||||
# self._cid,
|
|
||||||
# self._rx_chan.clone(),
|
|
||||||
# self._portal,
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||||
"""
|
"""
|
||||||
|
@ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
||||||
|
|
||||||
|
# TODO: but make it broadcasting to consumers
|
||||||
|
def clone(self):
|
||||||
|
"""Clone this receive channel allowing for multi-task
|
||||||
|
consumption from the same channel.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return MsgStream(
|
||||||
|
self._ctx,
|
||||||
|
self._rx_chan.clone(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Context:
|
class Context:
|
||||||
|
@ -308,7 +318,7 @@ class Context:
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
|
|
||||||
# only set on the callee side
|
# only set on the callee side
|
||||||
_cancel_scope: Optional[trio.CancelScope] = None
|
_scope_nursery: Optional[trio.Nursery] = None
|
||||||
|
|
||||||
async def send_yield(self, data: Any) -> None:
|
async def send_yield(self, data: Any) -> None:
|
||||||
|
|
||||||
|
@ -323,6 +333,16 @@ class Context:
|
||||||
async def send_stop(self) -> None:
|
async def send_stop(self) -> None:
|
||||||
await self.chan.send({'stop': True, 'cid': self.cid})
|
await self.chan.send({'stop': True, 'cid': self.cid})
|
||||||
|
|
||||||
|
def _error_from_remote_msg(
|
||||||
|
self,
|
||||||
|
msg: Dict[str, Any],
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
async def raiser():
|
||||||
|
raise unpack_error(msg, self.chan)
|
||||||
|
|
||||||
|
self._scope_nursery.start_soon(raiser)
|
||||||
|
|
||||||
async def cancel(self) -> None:
|
async def cancel(self) -> None:
|
||||||
'''Cancel this inter-actor-task context.
|
'''Cancel this inter-actor-task context.
|
||||||
|
|
||||||
|
@ -361,13 +381,16 @@ class Context:
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
else:
|
else:
|
||||||
# ensure callee side
|
# ensure callee side
|
||||||
assert self._cancel_scope
|
assert self._scope_nursery
|
||||||
# TODO: should we have an explicit cancel message
|
# TODO: should we have an explicit cancel message
|
||||||
# or is relaying the local `trio.Cancelled` as an
|
# or is relaying the local `trio.Cancelled` as an
|
||||||
# {'error': trio.Cancelled, cid: "blah"} enough?
|
# {'error': trio.Cancelled, cid: "blah"} enough?
|
||||||
# This probably gets into the discussion in
|
# This probably gets into the discussion in
|
||||||
# https://github.com/goodboy/tractor/issues/36
|
# https://github.com/goodboy/tractor/issues/36
|
||||||
self._cancel_scope.cancel()
|
self._scope_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
if self._recv_chan:
|
||||||
|
await self._recv_chan.aclose()
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
|
|
Loading…
Reference in New Issue