forked from goodboy/tractor
1
0
Fork 0

Drop use of `trio.Event.clear()`

Just spin up new events instead; because apparently they're
so cheap (rolls eyes).

Resolves #78
drop_event_clear
Tyler Goodlet 2019-11-23 01:29:02 -05:00
parent 4d43f2564c
commit d2a01e8b81
1 changed files with 6 additions and 8 deletions

View File

@ -144,7 +144,7 @@ async def _invoke(
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info(f"All RPC tasks have completed") log.info(f"All RPC tasks have completed")
actor._no_more_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
class Actor: class Actor:
@ -183,9 +183,8 @@ class Actor:
self._peer_connected: dict = {} self._peer_connected: dict = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
self._no_more_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set()
self._no_more_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: Dict[ self._rpc_tasks: Dict[
Tuple[Channel, str], Tuple[Channel, str],
@ -234,7 +233,7 @@ class Actor:
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers.clear() self._no_more_peers = trio.Event()
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.info(f"New connection to us {chan}")
@ -448,7 +447,7 @@ class Actor:
f"{cs}") f"{cs}")
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._no_more_rpc_tasks.clear() self._ongoing_rpc_tasks = trio.Event()
log.info(f"RPC func is {func}") log.info(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
@ -709,10 +708,9 @@ class Actor:
for (chan, cid) in tasks.copy(): for (chan, cid) in tasks.copy():
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan) await self._cancel_task(cid, chan)
# if tasks:
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._no_more_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None: def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby """Cancel the internal channel server nursery thereby