forked from goodboy/tractor
1
0
Fork 0

Merge pull request #92 from goodboy/drop_event_clear

Drop use of `trio.Event.clear()`
try_trip
goodboy 2019-11-25 13:52:20 -05:00 committed by GitHub
commit 8d2a05e788
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 33 deletions

View File

@ -261,13 +261,26 @@ async def test_some_cancels_all(num_actors_and_errs, start_method):
pytest.fail("Should have gotten a remote assertion error?") pytest.fail("Should have gotten a remote assertion error?")
async def spawn_and_error(num) -> None: async def spawn_and_error(breadth, depth) -> None:
name = tractor.current_actor().name name = tractor.current_actor().name
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
for i in range(num): for i in range(breadth):
await nursery.run_in_actor( if depth > 0:
f'{name}_errorer_{i}', assert_err args = (
) f'spawner_{i}_depth_{depth}',
spawn_and_error,
)
kwargs = {
'breadth': breadth,
'depth': depth - 1,
}
else:
args = (
f'{name}_errorer_{i}',
assert_err,
)
kwargs = {}
await nursery.run_in_actor(*args, **kwargs)
@tractor_test @tractor_test
@ -276,25 +289,34 @@ async def test_nested_multierrors(loglevel, start_method):
This test goes only 2 nurseries deep but we should eventually have tests This test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees. for arbitrary n-depth actor trees.
""" """
if platform.system() == 'Windows': # XXX: forkserver can't seem to handle any more then 2 depth
# Windows CI seems to be partifcularly fragile on Python 3.8.. # process trees for whatever reason.
num_subactors = 2 # Any more process levels then this and we start getting pretty slow anyway
else: depth = 3
# XXX: any more then this and the forkserver will subactor_breadth = 2
# start bailing hard...gotta look into it
num_subactors = 4
try: with trio.fail_after(120):
async with tractor.open_nursery() as nursery: try:
async with tractor.open_nursery() as nursery:
for i in range(subactor_breadth):
await nursery.run_in_actor(
f'spawner_{i}',
spawn_and_error,
breadth=subactor_breadth,
depth=depth,
)
except trio.MultiError as err:
assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions:
assert isinstance(subexc, tractor.RemoteActorError)
if depth > 1 and subactor_breadth > 1:
for i in range(num_subactors): # XXX not sure what's up with this..
await nursery.run_in_actor( if platform.system() == 'Windows':
f'spawner_{i}', assert (subexc.type is trio.MultiError) or (
spawn_and_error, subexc.type is tractor.RemoteActorError)
num=num_subactors, else:
) assert subexc.type is trio.MultiError
except trio.MultiError as err: else:
assert len(err.exceptions) == num_subactors assert (subexc.type is tractor.RemoteActorError) or (
for subexc in err.exceptions: subexc.type is trio.Cancelled)
assert isinstance(subexc, tractor.RemoteActorError)
assert subexc.type is trio.MultiError

View File

@ -144,7 +144,7 @@ async def _invoke(
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info(f"All RPC tasks have completed") log.info(f"All RPC tasks have completed")
actor._no_more_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
class Actor: class Actor:
@ -183,9 +183,8 @@ class Actor:
self._peer_connected: dict = {} self._peer_connected: dict = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
self._ongoing_rpc_tasks = trio.Event()
self._no_more_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set()
self._no_more_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: Dict[ self._rpc_tasks: Dict[
Tuple[Channel, str], Tuple[Channel, str],
@ -234,7 +233,7 @@ class Actor:
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers.clear() self._no_more_peers = trio.Event()
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.info(f"New connection to us {chan}")
@ -448,7 +447,7 @@ class Actor:
f"{cs}") f"{cs}")
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._no_more_rpc_tasks.clear() self._ongoing_rpc_tasks = trio.Event()
log.info(f"RPC func is {func}") log.info(f"RPC func is {func}")
# store cancel scope such that the rpc task can be # store cancel scope such that the rpc task can be
# cancelled gracefully if requested # cancelled gracefully if requested
@ -709,10 +708,9 @@ class Actor:
for (chan, cid) in tasks.copy(): for (chan, cid) in tasks.copy():
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan) await self._cancel_task(cid, chan)
# if tasks:
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._no_more_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None: def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby """Cancel the internal channel server nursery thereby