forked from goodboy/tractor
1
0
Fork 0

Error on mal-use of `Context.started()`

Previously we were ignoring a race where the callee an opened task
context could enter `Context.open_stream()` before calling `.started().
Disallow this as well as calling `.started()` more then once.
stricter_context_starting
Tyler Goodlet 2021-11-05 11:36:25 -04:00
parent ae6d751d71
commit f4793af2b9
1 changed files with 24 additions and 1 deletions

View File

@ -323,6 +323,8 @@ class Context:
_recv_chan: Optional[trio.MemoryReceiveChannel] = None _recv_chan: Optional[trio.MemoryReceiveChannel] = None
_result: Optional[Any] = False _result: Optional[Any] = False
_cancel_called: bool = False _cancel_called: bool = False
_started_called: bool = False
_started_received: bool = False
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
@ -455,6 +457,11 @@ class Context:
f'Context around {actor.uid[0]}:{task} was already cancelled!' f'Context around {actor.uid[0]}:{task} was already cancelled!'
) )
if not self._portal and not self._started_called:
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the # NOTE: in one way streaming this only happens on the
# caller side inside `Actor.send_cmd()` so if you try # caller side inside `Actor.send_cmd()` so if you try
# to send a stop from the caller to the callee in the # to send a stop from the caller to the callee in the
@ -536,13 +543,29 @@ class Context:
return self._result return self._result
async def started(self, value: Optional[Any] = None) -> None: async def started(
self,
value: Optional[Any] = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal: if self._portal:
raise RuntimeError( raise RuntimeError(
f"Caller side context {self} can not call started!") f"Caller side context {self} can not call started!")
elif self._started_called:
raise RuntimeError(
f"called 'started' twice on context with {self.chan.uid}")
await self.chan.send({'started': value, 'cid': self.cid}) await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
# TODO: do we need a restart api? # TODO: do we need a restart api?
# async def restart(self) -> None: # async def restart(self) -> None: