forked from goodboy/tractor
mpypy and docs fixups
parent
85a0700716
commit
fe1c4dbc4c
|
@ -303,11 +303,12 @@ class Actor:
|
||||||
|
|
||||||
async def _push_result(
|
async def _push_result(
|
||||||
self,
|
self,
|
||||||
actorid: Tuple[str, str],
|
chan: Channel,
|
||||||
msg: Dict[str, Any],
|
msg: Dict[str, Any],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Push an RPC result to the local consumer's queue.
|
"""Push an RPC result to the local consumer's queue.
|
||||||
"""
|
"""
|
||||||
|
actorid = chan.uid
|
||||||
assert actorid, f"`actorid` can't be {actorid}"
|
assert actorid, f"`actorid` can't be {actorid}"
|
||||||
cid = msg['cid']
|
cid = msg['cid']
|
||||||
send_chan = self._cids2qs[(actorid, cid)]
|
send_chan = self._cids2qs[(actorid, cid)]
|
||||||
|
@ -390,10 +391,10 @@ class Actor:
|
||||||
f" {chan} from {chan.uid}")
|
f" {chan} from {chan.uid}")
|
||||||
break
|
break
|
||||||
|
|
||||||
log.trace(f"Received msg {msg} from {chan.uid}")
|
log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore
|
||||||
if msg.get('cid'):
|
if msg.get('cid'):
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
await self._push_result(chan.uid, msg)
|
await self._push_result(chan, msg)
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -49,9 +49,10 @@ async def _do_handshake(
|
||||||
|
|
||||||
|
|
||||||
class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
"""A wrapper around a ``trio.abc.ReceiveChannel`` with
|
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
|
||||||
special behaviour for stream termination on both ends of
|
special behaviour for signalling stream termination on across an
|
||||||
an inter-actor channel.
|
inter-actor ``Channel``. This is the type returned to a local task
|
||||||
|
which invoked a remote streaming function using `Portal.run()`.
|
||||||
|
|
||||||
Termination rules:
|
Termination rules:
|
||||||
- if the local task signals stop iteration a cancel signal is
|
- if the local task signals stop iteration a cancel signal is
|
||||||
|
@ -65,7 +66,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
cid: str,
|
cid: str,
|
||||||
rx_chan: trio.abc.ReceiveChannel,
|
rx_chan: trio.abc.ReceiveChannel,
|
||||||
portal: 'Portal',
|
portal: 'Portal',
|
||||||
):
|
) -> None:
|
||||||
self._cid = cid
|
self._cid = cid
|
||||||
self._rx_chan = rx_chan
|
self._rx_chan = rx_chan
|
||||||
self._portal = portal
|
self._portal = portal
|
||||||
|
@ -84,28 +85,28 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
await self.aclose()
|
await self.aclose()
|
||||||
raise StopAsyncIteration
|
raise StopAsyncIteration
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
|
# relay cancels to the remote task
|
||||||
await self.aclose()
|
await self.aclose()
|
||||||
raise
|
raise
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# if 'stop' in msg:
|
|
||||||
# break # far end async gen terminated
|
|
||||||
# else:
|
|
||||||
# internal error should never get here
|
# internal error should never get here
|
||||||
assert msg.get('cid'), (
|
assert msg.get('cid'), (
|
||||||
"Received internal error at portal?")
|
"Received internal error at portal?")
|
||||||
raise unpack_error(msg, self._portal.channel)
|
raise unpack_error(msg, self._portal.channel)
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
|
"""Cancel associate remote actor task on close
|
||||||
|
as well as the local memory channel.
|
||||||
|
"""
|
||||||
if self._rx_chan._closed:
|
if self._rx_chan._closed:
|
||||||
log.warning(f"{self} is already closed")
|
log.warning(f"{self} is already closed")
|
||||||
return
|
return
|
||||||
cid = self._cid
|
cid = self._cid
|
||||||
# XXX: cancel remote task on close
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling stream {cid} to "
|
f"Cancelling stream {cid} to "
|
||||||
f"{self._portal.channel.uid}")
|
f"{self._portal.channel.uid}")
|
||||||
with trio.move_on_after(0.5) as cs:
|
|
||||||
cs.shield = True
|
|
||||||
# TODO: yeah.. it'd be nice if this was just an
|
# TODO: yeah.. it'd be nice if this was just an
|
||||||
# async func on the far end. Gotta figure out a
|
# async func on the far end. Gotta figure out a
|
||||||
# better way then implicitly feeding the ctx
|
# better way then implicitly feeding the ctx
|
||||||
|
@ -117,6 +118,9 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
|
# XXX: there's no way to know if the remote task was indeed
|
||||||
|
# cancelled in the case where the connection is broken or
|
||||||
|
# some other network error occurred.
|
||||||
if not self._portal.channel.connected():
|
if not self._portal.channel.connected():
|
||||||
log.warning(
|
log.warning(
|
||||||
"May have failed to cancel remote task "
|
"May have failed to cancel remote task "
|
||||||
|
|
Loading…
Reference in New Issue