forked from goodboy/tractor
1
0
Fork 0

Stream and context api tweaks

- drop `shield` input to `MsgStream`
- check for cancel called prior to loading the feeder mem chan
  in `Context.open_stream()`
- warn on a timeout when trying to cancel a remote task from
  `Context.cancel()`
- drop noop endofchannel handler block
fix_kbi_in_ctx_block
Tyler Goodlet 2021-10-04 10:20:49 -04:00
parent bd31f47d5f
commit 8b416e6bba
1 changed files with 18 additions and 18 deletions

View File

@ -51,7 +51,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel, rx_chan: trio.MemoryReceiveChannel,
shield: bool = False,
_broadcaster: Optional[BroadcastReceiver] = None, _broadcaster: Optional[BroadcastReceiver] = None,
) -> None: ) -> None:
@ -295,6 +294,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''Send a message over this stream to the far end. '''Send a message over this stream to the far end.
''' '''
# if self._eoc:
# raise trio.ClosedResourceError('This stream is already ded')
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@ -365,7 +367,7 @@ class Context:
''' '''
side = 'caller' if self._portal else 'callee' side = 'caller' if self._portal else 'callee'
log.warning(f'Cancelling {side} side of context to {self.chan}') log.warning(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True self._cancel_called = True
@ -396,6 +398,10 @@ class Context:
log.warning( log.warning(
"May have failed to cancel remote task " "May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
else:
log.warning(
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
else: else:
# callee side remote task # callee side remote task
@ -439,16 +445,6 @@ class Context:
# here we create a mem chan that corresponds to the # here we create a mem chan that corresponds to the
# far end caller / callee. # far end caller / callee.
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
_, recv_chan = actor.get_memchans(
self.chan.uid,
self.cid
)
# Likewise if the surrounding context has been cancelled we error here # Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or # since it likely means the surrounding block was exited or
# killed # killed
@ -459,6 +455,16 @@ class Context:
f'Context around {actor.uid[0]}:{task} was already cancelled!' f'Context around {actor.uid[0]}:{task} was already cancelled!'
) )
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
_, recv_chan = actor.get_memchans(
self.chan.uid,
self.cid
)
# XXX: If the underlying channel feeder receive mem chan has # XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited # been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other # a ``.open_stream()`` block prior or there was some other
@ -482,12 +488,6 @@ class Context:
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield rchan yield rchan
except trio.EndOfChannel:
# likely the far end sent us a 'stop' message to
# terminate the stream.
raise
else:
# XXX: Make the stream "one-shot use". On exit, signal # XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the # ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end. # far end.