diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 5c22116..af9ffaf 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -323,6 +323,8 @@ class Context: _recv_chan: Optional[trio.MemoryReceiveChannel] = None _result: Optional[Any] = False _cancel_called: bool = False + _started_called: bool = False + _started_received: bool = False # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None @@ -455,6 +457,11 @@ class Context: f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + if not self._portal and not self._started_called: + raise RuntimeError( + 'Context.started()` must be called before opening a stream' + ) + # NOTE: in one way streaming this only happens on the # caller side inside `Actor.send_cmd()` so if you try # to send a stop from the caller to the callee in the @@ -536,13 +543,29 @@ class Context: return self._result - async def started(self, value: Optional[Any] = None) -> None: + async def started( + self, + value: Optional[Any] = None + ) -> None: + ''' + Indicate to calling actor's task that this linked context + has started and send ``value`` to the other side. + + On the calling side ``value`` is the second item delivered + in the tuple returned by ``Portal.open_context()``. + + ''' if self._portal: raise RuntimeError( f"Caller side context {self} can not call started!") + elif self._started_called: + raise RuntimeError( + f"called 'started' twice on context with {self.chan.uid}") + await self.chan.send({'started': value, 'cid': self.cid}) + self._started_called = True # TODO: do we need a restart api? # async def restart(self) -> None: