forked from goodboy/tractor
				
			Compare commits
	
		
			31 Commits 
		
	
	
		
			master
			...
			bistream_b
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | a1603709ab | |
|  | 78b4eef7ee | |
|  | 211fb07074 | |
|  | ae45b5ff1d | |
|  | c542b915d6 | |
|  | 6bd16749f0 | |
|  | 8f468a8c86 | |
|  | 3fa36f64ac | |
|  | be39ff38e4 | |
|  | 9cd5d2d7b9 | |
|  | 4601c88574 | |
|  | a1488a1773 | |
|  | e058506a00 | |
|  | 19a23fefa9 | |
|  | 40ad00ce02 | |
|  | b3caf846fc | |
|  | 40cb3585c1 | |
|  | 88dbaff11b | |
|  | 3e34f0a374 | |
|  | 9e7bed646d | |
|  | 0b73a4b61e | |
|  | eb237f24cd | |
|  | 83f1e79fdd | |
|  | 1192541623 | |
|  | 15b63b7190 | |
|  | c4d5f9d41e | |
|  | b7089bb4e0 | |
|  | ecb9655519 | |
|  | f98860a5e5 | |
|  | 8c8a236799 | |
|  | 38ccbd0a9c | 
|  | @ -0,0 +1,498 @@ | |||
| """ | ||||
| Bidirectional streaming and context API. | ||||
| 
 | ||||
| """ | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| from conftest import tractor_test | ||||
| 
 | ||||
| # the general stream semantics are | ||||
| # - normal termination: far end relays a stop message which | ||||
| # terminates an ongoing ``MsgStream`` iteration | ||||
| # - cancel termination: context is cancelled on either side cancelling | ||||
| #  the "linked" inter-actor task context | ||||
| 
 | ||||
| 
 | ||||
| _state: bool = False | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_setup_teardown( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
|     block_forever: bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     # startup phase | ||||
|     global _state | ||||
|     _state = True | ||||
| 
 | ||||
|     # signal to parent that we're up | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     try: | ||||
|         if block_forever: | ||||
|             # block until cancelled | ||||
|             await trio.sleep_forever() | ||||
|         else: | ||||
|             return 'yo' | ||||
|     finally: | ||||
|         _state = False | ||||
| 
 | ||||
| 
 | ||||
| async def assert_state(value: bool): | ||||
|     global _state | ||||
|     assert _state == value | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'error_parent', | ||||
|     [False, True], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'callee_blocks_forever', | ||||
|     [False, True], | ||||
| ) | ||||
| def test_simple_context( | ||||
|     error_parent, | ||||
|     callee_blocks_forever, | ||||
| ): | ||||
| 
 | ||||
|     async def main(): | ||||
| 
 | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             portal = await n.start_actor( | ||||
|                 'simple_context', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 simple_setup_teardown, | ||||
|                 data=10, | ||||
|                 block_forever=callee_blocks_forever, | ||||
|             ) as (ctx, sent): | ||||
| 
 | ||||
|                 assert sent == 11 | ||||
| 
 | ||||
|                 if callee_blocks_forever: | ||||
|                     await portal.run(assert_state, value=True) | ||||
|                     await ctx.cancel() | ||||
|                 else: | ||||
|                     assert await ctx.result() == 'yo' | ||||
| 
 | ||||
|             # after cancellation | ||||
|             await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|             if error_parent: | ||||
|                 raise ValueError | ||||
| 
 | ||||
|             # shut down daemon | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
|     if error_parent: | ||||
|         try: | ||||
|             trio.run(main) | ||||
|         except ValueError: | ||||
|             pass | ||||
|     else: | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| # basic stream terminations: | ||||
| # - callee context closes without using stream | ||||
| # - caller context closes without using stream | ||||
| # - caller context calls `Context.cancel()` while streaming | ||||
| #   is ongoing resulting in callee being cancelled | ||||
| # - callee calls `Context.cancel()` while streaming and caller | ||||
| #   sees stream terminated in `RemoteActorError` | ||||
| 
 | ||||
| # TODO: future possible features | ||||
| # - restart request: far end raises `ContextRestart` | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def close_ctx_immediately( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
|     global _state | ||||
| 
 | ||||
|     async with ctx.open_stream(): | ||||
|         pass | ||||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_callee_closes_ctx_after_stream_open(): | ||||
|     'callee context closes without using stream' | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'fast_stream_closer', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             close_ctx_immediately, | ||||
| 
 | ||||
|             # flag to avoid waiting the final result | ||||
|             # cancel_on_exit=True, | ||||
| 
 | ||||
|         ) as (ctx, sent): | ||||
| 
 | ||||
|             assert sent is None | ||||
| 
 | ||||
|             with trio.fail_after(0.5): | ||||
|                 async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                     # should fall through since ``StopAsyncIteration`` | ||||
|                     # should be raised through translation of | ||||
|                     # a ``trio.EndOfChannel`` by | ||||
|                     # ``trio.abc.ReceiveChannel.__anext__()`` | ||||
|                     async for _ in stream: | ||||
|                         assert 0 | ||||
|                     else: | ||||
| 
 | ||||
|                         # verify stream is now closed | ||||
|                         try: | ||||
|                             await stream.receive() | ||||
|                         except trio.EndOfChannel: | ||||
|                             pass | ||||
| 
 | ||||
|             # TODO: should be just raise the closed resource err | ||||
|             # directly here to enforce not allowing a re-open | ||||
|             # of a stream to the context (at least until a time of | ||||
|             # if/when we decide that's a good idea?) | ||||
|             try: | ||||
|                 async with ctx.open_stream() as stream: | ||||
|                     pass | ||||
|             except trio.ClosedResourceError: | ||||
|                 pass | ||||
| 
 | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def expect_cancelled( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|     global _state | ||||
|     _state = True | ||||
| 
 | ||||
|     await ctx.started() | ||||
| 
 | ||||
|     try: | ||||
|         async with ctx.open_stream() as stream: | ||||
|             async for msg in stream: | ||||
|                 await stream.send(msg)  # echo server | ||||
| 
 | ||||
|     except trio.Cancelled: | ||||
|         # expected case | ||||
|         _state = False | ||||
|         raise | ||||
| 
 | ||||
|     else: | ||||
|         assert 0, "Wasn't cancelled!?" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'use_ctx_cancel_method', | ||||
|     [False, True], | ||||
| ) | ||||
| @tractor_test | ||||
| async def test_caller_closes_ctx_after_callee_opens_stream( | ||||
|     use_ctx_cancel_method: bool, | ||||
| ): | ||||
|     'caller context closes without using stream' | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'ctx_cancelled', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             expect_cancelled, | ||||
|         ) as (ctx, sent): | ||||
|             await portal.run(assert_state, value=True) | ||||
| 
 | ||||
|             assert sent is None | ||||
| 
 | ||||
|             # call cancel explicitly | ||||
|             if use_ctx_cancel_method: | ||||
| 
 | ||||
|                 await ctx.cancel() | ||||
| 
 | ||||
|                 try: | ||||
|                     async with ctx.open_stream() as stream: | ||||
|                         async for msg in stream: | ||||
|                             pass | ||||
| 
 | ||||
|                 except tractor.ContextCancelled: | ||||
|                     raise  # XXX: must be propagated to __aexit__ | ||||
| 
 | ||||
|                 else: | ||||
|                     assert 0, "Should have context cancelled?" | ||||
| 
 | ||||
|                 # channel should still be up | ||||
|                 assert portal.channel.connected() | ||||
| 
 | ||||
|                 # ctx is closed here | ||||
|                 await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|             else: | ||||
|                 try: | ||||
|                     with trio.fail_after(0.2): | ||||
|                         await ctx.result() | ||||
|                         assert 0, "Callee should have blocked!?" | ||||
|                 except trio.TooSlowError: | ||||
|                     await ctx.cancel() | ||||
|         try: | ||||
|             async with ctx.open_stream() as stream: | ||||
|                 async for msg in stream: | ||||
|                     pass | ||||
|         except trio.ClosedResourceError: | ||||
|             pass | ||||
|         else: | ||||
|             assert 0, "Should have received closed resource error?" | ||||
| 
 | ||||
|         # ctx is closed here | ||||
|         await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|         # channel should not have been destroyed yet, only the | ||||
|         # inter-actor-task context | ||||
|         assert portal.channel.connected() | ||||
| 
 | ||||
|         # teardown the actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_multitask_caller_cancels_from_nonroot_task(): | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'ctx_cancelled', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             expect_cancelled, | ||||
|         ) as (ctx, sent): | ||||
| 
 | ||||
|             await portal.run(assert_state, value=True) | ||||
|             assert sent is None | ||||
| 
 | ||||
|             async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                 async def send_msg_then_cancel(): | ||||
|                     await stream.send('yo') | ||||
|                     await portal.run(assert_state, value=True) | ||||
|                     await ctx.cancel() | ||||
|                     await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|                 async with trio.open_nursery() as n: | ||||
|                     n.start_soon(send_msg_then_cancel) | ||||
| 
 | ||||
|                     try: | ||||
|                         async for msg in stream: | ||||
|                             assert msg == 'yo' | ||||
| 
 | ||||
|                     except tractor.ContextCancelled: | ||||
|                         raise  # XXX: must be propagated to __aexit__ | ||||
| 
 | ||||
|                 # channel should still be up | ||||
|                 assert portal.channel.connected() | ||||
| 
 | ||||
|                 # ctx is closed here | ||||
|                 await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|         # channel should not have been destroyed yet, only the | ||||
|         # inter-actor-task context | ||||
|         assert portal.channel.connected() | ||||
| 
 | ||||
|         # teardown the actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def cancel_self( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|     global _state | ||||
|     _state = True | ||||
| 
 | ||||
|     await ctx.cancel() | ||||
|     try: | ||||
|         with trio.fail_after(0.1): | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
|     except trio.Cancelled: | ||||
|         raise | ||||
| 
 | ||||
|     except trio.TooSlowError: | ||||
|         # should never get here | ||||
|         assert 0 | ||||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_callee_cancels_before_started(): | ||||
|     '''callee calls `Context.cancel()` while streaming and caller | ||||
|     sees stream terminated in `ContextCancelled`. | ||||
| 
 | ||||
|     ''' | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'cancels_self', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
|         try: | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 cancel_self, | ||||
|             ) as (ctx, sent): | ||||
|                 async with ctx.open_stream(): | ||||
| 
 | ||||
|                     await trio.sleep_forever() | ||||
| 
 | ||||
|         # raises a special cancel signal | ||||
|         except tractor.ContextCancelled as ce: | ||||
|             ce.type == trio.Cancelled | ||||
| 
 | ||||
|         # teardown the actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_rpc( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
| 
 | ||||
| ) -> None: | ||||
|     """Test a small ping-pong server. | ||||
| 
 | ||||
|     """ | ||||
|     # signal to parent that we're up | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     print('opening stream in callee') | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         count = 0 | ||||
|         while True: | ||||
|             try: | ||||
|                 await stream.receive() == 'ping' | ||||
|             except trio.EndOfChannel: | ||||
|                 assert count == 10 | ||||
|                 break | ||||
|             else: | ||||
|                 print('pong') | ||||
|                 await stream.send('pong') | ||||
|                 count += 1 | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_rpc_with_forloop( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
| 
 | ||||
| ) -> None: | ||||
|     """Same as previous test but using ``async for`` syntax/api. | ||||
| 
 | ||||
|     """ | ||||
| 
 | ||||
|     # signal to parent that we're up | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     print('opening stream in callee') | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         count = 0 | ||||
|         async for msg in stream: | ||||
| 
 | ||||
|             assert msg == 'ping' | ||||
|             print('pong') | ||||
|             await stream.send('pong') | ||||
|             count += 1 | ||||
| 
 | ||||
|         else: | ||||
|             assert count == 10 | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'use_async_for', | ||||
|     [True, False], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'server_func', | ||||
|     [simple_rpc, simple_rpc_with_forloop], | ||||
| ) | ||||
| def test_simple_rpc(server_func, use_async_for): | ||||
|     """The simplest request response pattern. | ||||
| 
 | ||||
|     """ | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             portal = await n.start_actor( | ||||
|                 'rpc_server', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 server_func,  # taken from pytest parameterization | ||||
|                 data=10, | ||||
|             ) as (ctx, sent): | ||||
| 
 | ||||
|                 assert sent == 11 | ||||
| 
 | ||||
|                 async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                     if use_async_for: | ||||
| 
 | ||||
|                         count = 0 | ||||
|                         # receive msgs using async for style | ||||
|                         print('ping') | ||||
|                         await stream.send('ping') | ||||
| 
 | ||||
|                         async for msg in stream: | ||||
|                             assert msg == 'pong' | ||||
|                             print('ping') | ||||
|                             await stream.send('ping') | ||||
|                             count += 1 | ||||
| 
 | ||||
|                             if count >= 9: | ||||
|                                 break | ||||
| 
 | ||||
|                     else: | ||||
|                         # classic send/receive style | ||||
|                         for _ in range(10): | ||||
| 
 | ||||
|                             print('ping') | ||||
|                             await stream.send('ping') | ||||
|                             assert await stream.receive() == 'pong' | ||||
| 
 | ||||
|                 # stream should terminate here | ||||
| 
 | ||||
|             # final context result(s) should be consumed here in __aexit__() | ||||
| 
 | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  | @ -0,0 +1,220 @@ | |||
| """ | ||||
| Advanced streaming patterns using bidirectional streams and contexts. | ||||
| 
 | ||||
| """ | ||||
| import itertools | ||||
| from typing import Set, Dict, List | ||||
| 
 | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| _registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { | ||||
|     'even': set(), | ||||
|     'odd': set(), | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| async def publisher( | ||||
| 
 | ||||
|     seed: int = 0, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     global _registry | ||||
| 
 | ||||
|     def is_even(i): | ||||
|         return i % 2 == 0 | ||||
| 
 | ||||
|     for val in itertools.count(seed): | ||||
| 
 | ||||
|         sub = 'even' if is_even(val) else 'odd' | ||||
| 
 | ||||
|         for sub_stream in _registry[sub]: | ||||
|             await sub_stream.send(val) | ||||
| 
 | ||||
|         # throttle send rate to ~1kHz | ||||
|         # making it readable to a human user | ||||
|         await trio.sleep(1/1000) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def subscribe( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     global _registry | ||||
| 
 | ||||
|     # syn caller | ||||
|     await ctx.started(None) | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         # update subs list as consumer requests | ||||
|         async for new_subs in stream: | ||||
| 
 | ||||
|             new_subs = set(new_subs) | ||||
|             remove = new_subs - _registry.keys() | ||||
| 
 | ||||
|             print(f'setting sub to {new_subs} for {ctx.chan.uid}') | ||||
| 
 | ||||
|             # remove old subs | ||||
|             for sub in remove: | ||||
|                 _registry[sub].remove(stream) | ||||
| 
 | ||||
|             # add new subs for consumer | ||||
|             for sub in new_subs: | ||||
|                 _registry[sub].add(stream) | ||||
| 
 | ||||
| 
 | ||||
| async def consumer( | ||||
| 
 | ||||
|     subs: List[str], | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     uid = tractor.current_actor().uid | ||||
| 
 | ||||
|     async with tractor.wait_for_actor('publisher') as portal: | ||||
|         async with portal.open_context(subscribe) as (ctx, first): | ||||
|             async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                 # flip between the provided subs dynamically | ||||
|                 if len(subs) > 1: | ||||
| 
 | ||||
|                     for sub in itertools.cycle(subs): | ||||
|                         print(f'setting dynamic sub to {sub}') | ||||
|                         await stream.send([sub]) | ||||
| 
 | ||||
|                         count = 0 | ||||
|                         async for value in stream: | ||||
|                             print(f'{uid} got: {value}') | ||||
|                             if count > 5: | ||||
|                                 break | ||||
|                             count += 1 | ||||
| 
 | ||||
|                 else:  # static sub | ||||
| 
 | ||||
|                     await stream.send(subs) | ||||
|                     async for value in stream: | ||||
|                         print(f'{uid} got: {value}') | ||||
| 
 | ||||
| 
 | ||||
| def test_dynamic_pub_sub(): | ||||
| 
 | ||||
|     global _registry | ||||
| 
 | ||||
|     from multiprocessing import cpu_count | ||||
|     cpus = cpu_count() | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             # name of this actor will be same as target func | ||||
|             await n.run_in_actor(publisher) | ||||
| 
 | ||||
|             for i, sub in zip( | ||||
|                 range(cpus - 2), | ||||
|                 itertools.cycle(_registry.keys()) | ||||
|             ): | ||||
|                 await n.run_in_actor( | ||||
|                     consumer, | ||||
|                     name=f'consumer_{sub}', | ||||
|                     subs=[sub], | ||||
|                 ) | ||||
| 
 | ||||
|             # make one dynamic subscriber | ||||
|             await n.run_in_actor( | ||||
|                 consumer, | ||||
|                 name='consumer_dynamic', | ||||
|                 subs=list(_registry.keys()), | ||||
|             ) | ||||
| 
 | ||||
|             # block until cancelled by user | ||||
|             with trio.fail_after(3): | ||||
|                 await trio.sleep_forever() | ||||
| 
 | ||||
|     try: | ||||
|         trio.run(main) | ||||
|     except trio.TooSlowError: | ||||
|         pass | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def one_task_streams_and_one_handles_reqresp( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         async def pingpong(): | ||||
|             '''Run a simple req/response service. | ||||
| 
 | ||||
|             ''' | ||||
|             async for msg in stream: | ||||
|                 print('rpc server ping') | ||||
|                 assert msg == 'ping' | ||||
|                 print('rpc server pong') | ||||
|                 await stream.send('pong') | ||||
| 
 | ||||
|         async with trio.open_nursery() as n: | ||||
|             n.start_soon(pingpong) | ||||
| 
 | ||||
|             for _ in itertools.count(): | ||||
|                 await stream.send('yo') | ||||
|                 await trio.sleep(0.01) | ||||
| 
 | ||||
| 
 | ||||
| def test_reqresp_ontopof_streaming(): | ||||
|     '''Test a subactor that both streams with one task and | ||||
|     spawns another which handles a small requests-response | ||||
|     dialogue over the same bidir-stream. | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
| 
 | ||||
|         with trio.move_on_after(2): | ||||
|             async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|                 # name of this actor will be same as target func | ||||
|                 portal = await n.start_actor( | ||||
|                     'dual_tasks', | ||||
|                     enable_modules=[__name__] | ||||
|                 ) | ||||
| 
 | ||||
|                 # flat to make sure we get at least one pong | ||||
|                 got_pong: bool = False | ||||
| 
 | ||||
|                 async with portal.open_context( | ||||
|                     one_task_streams_and_one_handles_reqresp, | ||||
| 
 | ||||
|                 ) as (ctx, first): | ||||
| 
 | ||||
|                     assert first is None | ||||
| 
 | ||||
|                     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                         await stream.send('ping') | ||||
| 
 | ||||
|                         async for msg in stream: | ||||
|                             print(f'client received: {msg}') | ||||
| 
 | ||||
|                             assert msg in {'pong', 'yo'} | ||||
| 
 | ||||
|                             if msg == 'pong': | ||||
|                                 got_pong = True | ||||
|                                 await stream.send('ping') | ||||
|                                 print('client sent ping') | ||||
| 
 | ||||
|         assert got_pong | ||||
| 
 | ||||
|     try: | ||||
|         trio.run(main) | ||||
|     except trio.TooSlowError: | ||||
|         pass | ||||
|  | @ -32,13 +32,16 @@ async def async_gen_stream(sequence): | |||
| 
 | ||||
|     # block indefinitely waiting to be cancelled by ``aclose()`` call | ||||
|     with trio.CancelScope() as cs: | ||||
|         await trio.sleep(float('inf')) | ||||
|         await trio.sleep_forever() | ||||
|         assert 0 | ||||
|     assert cs.cancelled_caught | ||||
| 
 | ||||
| 
 | ||||
| @tractor.stream | ||||
| async def context_stream(ctx, sequence): | ||||
| async def context_stream( | ||||
|     ctx: tractor.Context, | ||||
|     sequence | ||||
| ): | ||||
|     for i in sequence: | ||||
|         await ctx.send_yield(i) | ||||
|         await trio.sleep(0.1) | ||||
|  | @ -338,6 +341,8 @@ async def test_respawn_consumer_task( | |||
|                         print("all values streamed, BREAKING") | ||||
|                         break | ||||
| 
 | ||||
|                 cs.cancel() | ||||
| 
 | ||||
|         # TODO: this is justification for a | ||||
|         # ``ActorNursery.stream_from_actor()`` helper? | ||||
|         await portal.cancel_actor() | ||||
|  |  | |||
|  | @ -5,11 +5,21 @@ tractor: An actor model micro-framework built on | |||
| from trio import MultiError | ||||
| 
 | ||||
| from ._ipc import Channel | ||||
| from ._streaming import Context, stream | ||||
| from ._streaming import ( | ||||
|     Context, | ||||
|     ReceiveMsgStream, | ||||
|     MsgStream, | ||||
|     stream, | ||||
|     context, | ||||
| ) | ||||
| from ._discovery import get_arbiter, find_actor, wait_for_actor | ||||
| from ._trionics import open_nursery | ||||
| from ._state import current_actor, is_root_process | ||||
| from ._exceptions import RemoteActorError, ModuleNotExposed | ||||
| from ._exceptions import ( | ||||
|     RemoteActorError, | ||||
|     ModuleNotExposed, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from ._debug import breakpoint, post_mortem | ||||
| from . import msg | ||||
| from ._root import run, run_daemon, open_root_actor | ||||
|  | @ -21,6 +31,7 @@ __all__ = [ | |||
|     'ModuleNotExposed', | ||||
|     'MultiError', | ||||
|     'RemoteActorError', | ||||
|     'ContextCancelled', | ||||
|     'breakpoint', | ||||
|     'current_actor', | ||||
|     'find_actor', | ||||
|  | @ -33,7 +44,9 @@ __all__ = [ | |||
|     'run', | ||||
|     'run_daemon', | ||||
|     'stream', | ||||
|     'wait_for_actor', | ||||
|     'context', | ||||
|     'ReceiveMsgStream', | ||||
|     'MsgStream', | ||||
|     'to_asyncio', | ||||
|     'wait_for_actor', | ||||
| ] | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ from types import ModuleType | |||
| import sys | ||||
| import os | ||||
| from contextlib import ExitStack | ||||
| import warnings | ||||
| 
 | ||||
| import trio  # type: ignore | ||||
| from trio_typing import TaskStatus | ||||
|  | @ -27,6 +28,7 @@ from ._exceptions import ( | |||
|     unpack_error, | ||||
|     ModuleNotExposed, | ||||
|     is_multi_cancelled, | ||||
|     ContextCancelled, | ||||
|     TransportClosed, | ||||
| ) | ||||
| from . import _debug | ||||
|  | @ -58,13 +60,37 @@ async def _invoke( | |||
|     treat_as_gen = False | ||||
|     cs = None | ||||
|     cancel_scope = trio.CancelScope() | ||||
|     ctx = Context(chan, cid, cancel_scope) | ||||
|     ctx = Context(chan, cid, _cancel_scope=cancel_scope) | ||||
|     context = False | ||||
| 
 | ||||
|     if getattr(func, '_tractor_stream_function', False): | ||||
|         # handle decorated ``@tractor.stream`` async functions | ||||
|         sig = inspect.signature(func) | ||||
|         params = sig.parameters | ||||
| 
 | ||||
|         # compat with old api | ||||
|         kwargs['ctx'] = ctx | ||||
| 
 | ||||
|         if 'ctx' in params: | ||||
|             warnings.warn( | ||||
|                 "`@tractor.stream decorated funcs should now declare " | ||||
|                 "a `stream`  arg, `ctx` is now designated for use with " | ||||
|                 "@tractor.context", | ||||
|                 DeprecationWarning, | ||||
|                 stacklevel=2, | ||||
|             ) | ||||
| 
 | ||||
|         elif 'stream' in params: | ||||
|             assert 'stream' in params | ||||
|             kwargs['stream'] = ctx | ||||
| 
 | ||||
|         treat_as_gen = True | ||||
| 
 | ||||
|     elif getattr(func, '_tractor_context_function', False): | ||||
|         # handle decorated ``@tractor.context`` async function | ||||
|         kwargs['ctx'] = ctx | ||||
|         context = True | ||||
| 
 | ||||
|     # errors raised inside this block are propgated back to caller | ||||
|     try: | ||||
|         if not ( | ||||
|  | @ -102,33 +128,52 @@ async def _invoke( | |||
|             # `StopAsyncIteration` system here for returning a final | ||||
|             # value if desired | ||||
|             await chan.send({'stop': True, 'cid': cid}) | ||||
| 
 | ||||
|         # one way @stream func that gets treated like an async gen | ||||
|         elif treat_as_gen: | ||||
|             await chan.send({'functype': 'asyncgen', 'cid': cid}) | ||||
|             # XXX: the async-func may spawn further tasks which push | ||||
|             # back values like an async-generator would but must | ||||
|             # manualy construct the response dict-packet-responses as | ||||
|             # above | ||||
|             with cancel_scope as cs: | ||||
|                 task_status.started(cs) | ||||
|                 await coro | ||||
| 
 | ||||
|             if not cs.cancelled_caught: | ||||
|                 # task was not cancelled so we can instruct the | ||||
|                 # far end async gen to tear down | ||||
|                 await chan.send({'stop': True, 'cid': cid}) | ||||
| 
 | ||||
|         elif context: | ||||
|             # context func with support for bi-dir streaming | ||||
|             await chan.send({'functype': 'context', 'cid': cid}) | ||||
| 
 | ||||
|             with cancel_scope as cs: | ||||
|                 task_status.started(cs) | ||||
|                 await chan.send({'return': await coro, 'cid': cid}) | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
|                 # task-contex was cancelled so relay to the cancel to caller | ||||
|                 raise ContextCancelled( | ||||
|                     f'{func.__name__} cancelled itself', | ||||
|                     suberror_type=trio.Cancelled, | ||||
|                 ) | ||||
| 
 | ||||
|         else: | ||||
|             if treat_as_gen: | ||||
|                 await chan.send({'functype': 'asyncgen', 'cid': cid}) | ||||
|                 # XXX: the async-func may spawn further tasks which push | ||||
|                 # back values like an async-generator would but must | ||||
|                 # manualy construct the response dict-packet-responses as | ||||
|                 # above | ||||
|                 with cancel_scope as cs: | ||||
|                     task_status.started(cs) | ||||
|                     await coro | ||||
|                 if not cs.cancelled_caught: | ||||
|                     # task was not cancelled so we can instruct the | ||||
|                     # far end async gen to tear down | ||||
|                     await chan.send({'stop': True, 'cid': cid}) | ||||
|             else: | ||||
|                 # regular async function | ||||
|                 await chan.send({'functype': 'asyncfunc', 'cid': cid}) | ||||
|                 with cancel_scope as cs: | ||||
|                     task_status.started(cs) | ||||
|                     await chan.send({'return': await coro, 'cid': cid}) | ||||
|             # regular async function | ||||
|             await chan.send({'functype': 'asyncfunc', 'cid': cid}) | ||||
|             with cancel_scope as cs: | ||||
|                 task_status.started(cs) | ||||
|                 await chan.send({'return': await coro, 'cid': cid}) | ||||
| 
 | ||||
|     except (Exception, trio.MultiError) as err: | ||||
| 
 | ||||
|         # TODO: maybe we'll want differnet "levels" of debugging | ||||
|         # eventualy such as ('app', 'supervisory', 'runtime') ? | ||||
|         if not isinstance(err, trio.ClosedResourceError) and ( | ||||
|             not is_multi_cancelled(err) | ||||
|             not is_multi_cancelled(err)) and ( | ||||
|             not isinstance(err, ContextCancelled) | ||||
|         ): | ||||
|             # XXX: is there any case where we'll want to debug IPC | ||||
|             # disconnects? I can't think of a reason that inspecting | ||||
|  | @ -263,7 +308,7 @@ class Actor: | |||
|         self._parent_chan: Optional[Channel] = None | ||||
|         self._forkserver_info: Optional[ | ||||
|             Tuple[Any, Any, Any, Any, Any]] = None | ||||
|         self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}  # type: ignore | ||||
|         self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}  # type: ignore  # noqa | ||||
| 
 | ||||
|     async def wait_for_peer( | ||||
|         self, uid: Tuple[str, str] | ||||
|  | @ -416,10 +461,10 @@ class Actor: | |||
|         send_chan, recv_chan = self._cids2qs[(actorid, cid)] | ||||
|         assert send_chan.cid == cid  # type: ignore | ||||
| 
 | ||||
|         if 'stop' in msg: | ||||
|             log.debug(f"{send_chan} was terminated at remote end") | ||||
|             # indicate to consumer that far end has stopped | ||||
|             return await send_chan.aclose() | ||||
|         # if 'stop' in msg: | ||||
|         #     log.debug(f"{send_chan} was terminated at remote end") | ||||
|         #     # indicate to consumer that far end has stopped | ||||
|         #     return await send_chan.aclose() | ||||
| 
 | ||||
|         try: | ||||
|             log.debug(f"Delivering {msg} from {actorid} to caller {cid}") | ||||
|  | @ -427,6 +472,12 @@ class Actor: | |||
|             await send_chan.send(msg) | ||||
| 
 | ||||
|         except trio.BrokenResourceError: | ||||
|             # TODO: what is the right way to handle the case where the | ||||
|             # local task has already sent a 'stop' / StopAsyncInteration | ||||
|             # to the other side but and possibly has closed the local | ||||
|             # feeder mem chan? Do we wait for some kind of ack or just | ||||
|             # let this fail silently and bubble up (currently)? | ||||
| 
 | ||||
|             # XXX: local consumer has closed their side | ||||
|             # so cancel the far end streaming task | ||||
|             log.warning(f"{send_chan} consumer is already closed") | ||||
|  | @ -489,11 +540,14 @@ class Actor: | |||
|                 task_status.started(loop_cs) | ||||
|                 async for msg in chan: | ||||
|                     if msg is None:  # loop terminate sentinel | ||||
| 
 | ||||
|                         log.debug( | ||||
|                             f"Cancelling all tasks for {chan} from {chan.uid}") | ||||
|                         for (channel, cid) in self._rpc_tasks: | ||||
| 
 | ||||
|                         for (channel, cid) in self._rpc_tasks.copy(): | ||||
|                             if channel is chan: | ||||
|                                 await self._cancel_task(cid, channel) | ||||
| 
 | ||||
|                         log.debug( | ||||
|                                 f"Msg loop signalled to terminate for" | ||||
|                                 f" {chan} from {chan.uid}") | ||||
|  | @ -506,6 +560,7 @@ class Actor: | |||
|                     if cid: | ||||
|                         # deliver response to local caller/waiter | ||||
|                         await self._push_result(chan, cid, msg) | ||||
| 
 | ||||
|                         log.debug( | ||||
|                             f"Waiting on next msg for {chan} from {chan.uid}") | ||||
|                         continue | ||||
|  |  | |||
|  | @ -1,13 +1,13 @@ | |||
| """ | ||||
| Multi-core debugging for da peeps! | ||||
| 
 | ||||
| """ | ||||
| import bdb | ||||
| import sys | ||||
| from functools import partial | ||||
| from contextlib import asynccontextmanager | ||||
| from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator | ||||
| from typing import Tuple, Optional, Callable, AsyncIterator | ||||
| 
 | ||||
| from async_generator import aclosing | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
|  | @ -31,14 +31,21 @@ log = get_logger(__name__) | |||
| 
 | ||||
| __all__ = ['breakpoint', 'post_mortem'] | ||||
| 
 | ||||
| 
 | ||||
| # TODO: wrap all these in a static global class: ``DebugLock`` maybe? | ||||
| 
 | ||||
| # placeholder for function to set a ``trio.Event`` on debugger exit | ||||
| _pdb_release_hook: Optional[Callable] = None | ||||
| 
 | ||||
| # actor-wide variable pointing to current task name using debugger | ||||
| _in_debug = False | ||||
| _local_task_in_debug: Optional[str] = None | ||||
| 
 | ||||
| # actor tree-wide actor uid that supposedly has the tty lock | ||||
| _global_actor_in_debug: Optional[Tuple[str, str]] = None | ||||
| 
 | ||||
| # lock in root actor preventing multi-access to local tty | ||||
| _debug_lock = trio.StrictFIFOLock() | ||||
| _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() | ||||
| _pdb_complete: Optional[trio.Event] = None | ||||
| 
 | ||||
| # XXX: set by the current task waiting on the root tty lock | ||||
| # and must be cancelled if this actor is cancelled via message | ||||
|  | @ -61,19 +68,19 @@ class PdbwTeardown(pdbpp.Pdb): | |||
|     # TODO: figure out how to dissallow recursive .set_trace() entry | ||||
|     # since that'll cause deadlock for us. | ||||
|     def set_continue(self): | ||||
|         global _in_debug | ||||
|         try: | ||||
|             super().set_continue() | ||||
|         finally: | ||||
|             _in_debug = False | ||||
|             global _local_task_in_debug | ||||
|             _local_task_in_debug = None | ||||
|             _pdb_release_hook() | ||||
| 
 | ||||
|     def set_quit(self): | ||||
|         global _in_debug | ||||
|         try: | ||||
|             super().set_quit() | ||||
|         finally: | ||||
|             _in_debug = False | ||||
|             global _local_task_in_debug | ||||
|             _local_task_in_debug = None | ||||
|             _pdb_release_hook() | ||||
| 
 | ||||
| 
 | ||||
|  | @ -119,18 +126,22 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: | |||
|     """Acquire a actor local FIFO lock meant to mutex entry to a local | ||||
|     debugger entry point to avoid tty clobbering by multiple processes. | ||||
|     """ | ||||
|     task_name = trio.lowlevel.current_task().name | ||||
|     try: | ||||
|         log.debug( | ||||
|             f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") | ||||
|         await _debug_lock.acquire() | ||||
|     global _debug_lock, _global_actor_in_debug | ||||
| 
 | ||||
|     task_name = trio.lowlevel.current_task().name | ||||
| 
 | ||||
|     log.debug( | ||||
|         f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}") | ||||
| 
 | ||||
|     async with _debug_lock: | ||||
| 
 | ||||
|         # _debug_lock._uid = uid | ||||
|         _global_actor_in_debug = uid | ||||
|         log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") | ||||
|         yield | ||||
| 
 | ||||
|     finally: | ||||
|         _debug_lock.release() | ||||
|         log.debug(f"TTY lock released, remote task: {task_name}:{uid}") | ||||
|     _global_actor_in_debug = None | ||||
|     log.debug(f"TTY lock released, remote task: {task_name}:{uid}") | ||||
| 
 | ||||
| 
 | ||||
| # @contextmanager | ||||
|  | @ -144,118 +155,180 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: | |||
| #         signal.signal(signal.SIGINT, prior_handler) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def _hijack_stdin_relay_to_child( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     subactor_uid: Tuple[str, str] | ||||
| ) -> AsyncIterator[str]: | ||||
| 
 | ||||
| ) -> str: | ||||
| 
 | ||||
|     global _pdb_complete | ||||
| 
 | ||||
|     task_name = trio.lowlevel.current_task().name | ||||
| 
 | ||||
|     # TODO: when we get to true remote debugging | ||||
|     # this will deliver stdin data | ||||
|     log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock") | ||||
|     # this will deliver stdin data? | ||||
| 
 | ||||
|     log.debug( | ||||
|         "Attempting to acquire TTY lock, " | ||||
|         f"remote task: {task_name}:{subactor_uid}" | ||||
|     ) | ||||
| 
 | ||||
|     log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") | ||||
| 
 | ||||
|     async with _acquire_debug_lock(subactor_uid): | ||||
|         log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") | ||||
| 
 | ||||
|         # with _disable_sigint(): | ||||
|         # XXX: only shield the context sync step! | ||||
|         with trio.CancelScope(shield=True): | ||||
| 
 | ||||
|         # indicate to child that we've locked stdio | ||||
|         yield 'Locked' | ||||
|             # indicate to child that we've locked stdio | ||||
|             await ctx.started('Locked') | ||||
|             log.runtime(  # type: ignore | ||||
|                 f"Actor {subactor_uid} ACQUIRED stdin hijack lock") | ||||
| 
 | ||||
|         # wait for cancellation of stream by child | ||||
|         # indicating debugger is dis-engaged | ||||
|         await trio.sleep_forever() | ||||
|         # wait for unlock pdb by child | ||||
|         async with ctx.open_stream() as stream: | ||||
|             try: | ||||
|                 assert await stream.receive() == 'pdb_unlock' | ||||
| 
 | ||||
|             except trio.BrokenResourceError: | ||||
|                 # XXX: there may be a race with the portal teardown | ||||
|                 # with the calling actor which we can safely ignore | ||||
|                 # the alternative would be sending an ack message | ||||
|                 # and allowing the client to wait for us to teardown | ||||
|                 # first? | ||||
|                 pass | ||||
| 
 | ||||
|     log.debug( | ||||
|         f"TTY lock released, remote task: {task_name}:{subactor_uid}") | ||||
| 
 | ||||
|     log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") | ||||
|     return "pdb_unlock_complete" | ||||
| 
 | ||||
| 
 | ||||
| # XXX: We only make this sync in case someone wants to | ||||
| # overload the ``breakpoint()`` built-in. | ||||
| def _breakpoint(debug_func) -> Awaitable[None]: | ||||
| async def _breakpoint(debug_func) -> None: | ||||
|     """``tractor`` breakpoint entry for engaging pdb machinery | ||||
|     in subactors. | ||||
| 
 | ||||
|     """ | ||||
|     actor = tractor.current_actor() | ||||
|     do_unlock = trio.Event() | ||||
|     task_name = trio.lowlevel.current_task().name | ||||
| 
 | ||||
|     global _pdb_complete, _pdb_release_hook | ||||
|     global _local_task_in_debug, _global_actor_in_debug | ||||
| 
 | ||||
|     async def wait_for_parent_stdin_hijack( | ||||
|         task_status=trio.TASK_STATUS_IGNORED | ||||
|     ): | ||||
|         global _debugger_request_cs | ||||
| 
 | ||||
|         with trio.CancelScope() as cs: | ||||
|             _debugger_request_cs = cs | ||||
| 
 | ||||
|             try: | ||||
|                 async with get_root() as portal: | ||||
|                         async with portal.open_stream_from( | ||||
|                             tractor._debug._hijack_stdin_relay_to_child, | ||||
|                             subactor_uid=actor.uid, | ||||
|                         ) as stream: | ||||
| 
 | ||||
|                                 # block until first yield above | ||||
|                                 async for val in stream: | ||||
|                     # this syncs to child's ``Context.started()`` call. | ||||
|                     async with portal.open_context( | ||||
| 
 | ||||
|                                     assert val == 'Locked' | ||||
|                                     task_status.started() | ||||
|                         tractor._debug._hijack_stdin_relay_to_child, | ||||
|                         subactor_uid=actor.uid, | ||||
| 
 | ||||
|                                     # with trio.CancelScope(shield=True): | ||||
|                                     await do_unlock.wait() | ||||
|                     ) as (ctx, val): | ||||
| 
 | ||||
|                         assert val == 'Locked' | ||||
| 
 | ||||
|                         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                             # unblock local caller | ||||
|                             task_status.started() | ||||
| 
 | ||||
|                             # TODO: shielding currently can cause hangs... | ||||
|                             # with trio.CancelScope(shield=True): | ||||
| 
 | ||||
|                             await _pdb_complete.wait() | ||||
|                             await stream.send('pdb_unlock') | ||||
| 
 | ||||
|                             # sync with callee termination | ||||
|                             assert await ctx.result() == "pdb_unlock_complete" | ||||
| 
 | ||||
|             except tractor.ContextCancelled: | ||||
|                 log.warning('Root actor cancelled debug lock') | ||||
| 
 | ||||
|                                     # trigger cancellation of remote stream | ||||
|                                     break | ||||
|             finally: | ||||
|                 log.debug(f"Exiting debugger for actor {actor}") | ||||
|                 global _in_debug | ||||
|                 _in_debug = False | ||||
|                 global _local_task_in_debug | ||||
|                 _local_task_in_debug = None | ||||
|                 log.debug(f"Child {actor} released parent stdio lock") | ||||
| 
 | ||||
|     async def _bp(): | ||||
|         """Async breakpoint which schedules a parent stdio lock, and once complete | ||||
|         enters the ``pdbpp`` debugging console. | ||||
|         """ | ||||
|         task_name = trio.lowlevel.current_task().name | ||||
|     if not _pdb_complete or _pdb_complete.is_set(): | ||||
|         _pdb_complete = trio.Event() | ||||
| 
 | ||||
|         global _in_debug | ||||
| 
 | ||||
|         # TODO: need a more robust check for the "root" actor | ||||
|         if actor._parent_chan and not is_root_process(): | ||||
|             if _in_debug: | ||||
|                 if _in_debug == task_name: | ||||
|                     # this task already has the lock and is | ||||
|                     # likely recurrently entering a breakpoint | ||||
|                     return | ||||
| 
 | ||||
|                 # if **this** actor is already in debug mode block here | ||||
|                 # waiting for the control to be released - this allows | ||||
|                 # support for recursive entries to `tractor.breakpoint()` | ||||
|                 log.warning( | ||||
|                     f"Actor {actor.uid} already has a debug lock, waiting...") | ||||
|                 await do_unlock.wait() | ||||
|                 await trio.sleep(0.1) | ||||
| 
 | ||||
|             # assign unlock callback for debugger teardown hooks | ||||
|             global _pdb_release_hook | ||||
|             _pdb_release_hook = do_unlock.set | ||||
| 
 | ||||
|             # mark local actor as "in debug mode" to avoid recurrent | ||||
|             # entries/requests to the root process | ||||
|             _in_debug = task_name | ||||
| 
 | ||||
|             # this **must** be awaited by the caller and is done using the | ||||
|             # root nursery so that the debugger can continue to run without | ||||
|             # being restricted by the scope of a new task nursery. | ||||
|             await actor._service_n.start(wait_for_parent_stdin_hijack) | ||||
| 
 | ||||
|         elif is_root_process(): | ||||
|             # we also wait in the root-parent for any child that | ||||
|             # may have the tty locked prior | ||||
|             if _debug_lock.locked():  # root process already has it; ignore | ||||
|     # TODO: need a more robust check for the "root" actor | ||||
|     if actor._parent_chan and not is_root_process(): | ||||
|         if _local_task_in_debug: | ||||
|             if _local_task_in_debug == task_name: | ||||
|                 # this task already has the lock and is | ||||
|                 # likely recurrently entering a breakpoint | ||||
|                 return | ||||
|             await _debug_lock.acquire() | ||||
|             _pdb_release_hook = _debug_lock.release | ||||
| 
 | ||||
|         # block here one (at the appropriate frame *up* where | ||||
|         # ``breakpoint()`` was awaited and begin handling stdio | ||||
|         log.debug("Entering the synchronous world of pdb") | ||||
|         debug_func(actor) | ||||
|             # if **this** actor is already in debug mode block here | ||||
|             # waiting for the control to be released - this allows | ||||
|             # support for recursive entries to `tractor.breakpoint()` | ||||
|             log.warning(f"{actor.uid} already has a debug lock, waiting...") | ||||
| 
 | ||||
|     # user code **must** await this! | ||||
|     return _bp() | ||||
|             await _pdb_complete.wait() | ||||
|             await trio.sleep(0.1) | ||||
| 
 | ||||
|         # mark local actor as "in debug mode" to avoid recurrent | ||||
|         # entries/requests to the root process | ||||
|         _local_task_in_debug = task_name | ||||
| 
 | ||||
|         # assign unlock callback for debugger teardown hooks | ||||
|         _pdb_release_hook = _pdb_complete.set | ||||
| 
 | ||||
|         # this **must** be awaited by the caller and is done using the | ||||
|         # root nursery so that the debugger can continue to run without | ||||
|         # being restricted by the scope of a new task nursery. | ||||
|         await actor._service_n.start(wait_for_parent_stdin_hijack) | ||||
| 
 | ||||
|     elif is_root_process(): | ||||
| 
 | ||||
|         # we also wait in the root-parent for any child that | ||||
|         # may have the tty locked prior | ||||
|         global _debug_lock | ||||
| 
 | ||||
|         # TODO: wait, what about multiple root tasks acquiring | ||||
|         # it though.. shrug? | ||||
|         # root process (us) already has it; ignore | ||||
|         if _global_actor_in_debug == actor.uid: | ||||
|             return | ||||
| 
 | ||||
|         # XXX: since we need to enter pdb synchronously below, | ||||
|         # we have to release the lock manually from pdb completion | ||||
|         # callbacks. Can't think of a nicer way then this atm. | ||||
|         await _debug_lock.acquire() | ||||
| 
 | ||||
|         _global_actor_in_debug = actor.uid | ||||
|         _local_task_in_debug = task_name | ||||
| 
 | ||||
|         # the lock must be released on pdb completion | ||||
|         def teardown(): | ||||
|             global _pdb_complete, _debug_lock | ||||
|             global _global_actor_in_debug, _local_task_in_debug | ||||
| 
 | ||||
|             _debug_lock.release() | ||||
|             _global_actor_in_debug = None | ||||
|             _local_task_in_debug = None | ||||
|             _pdb_complete.set() | ||||
| 
 | ||||
|         _pdb_release_hook = teardown | ||||
| 
 | ||||
|     # block here one (at the appropriate frame *up* where | ||||
|     # ``breakpoint()`` was awaited and begin handling stdio | ||||
|     log.debug("Entering the synchronous world of pdb") | ||||
|     debug_func(actor) | ||||
| 
 | ||||
| 
 | ||||
| def _mk_pdb(): | ||||
|  | @ -276,7 +349,7 @@ def _set_trace(actor=None): | |||
|     pdb = _mk_pdb() | ||||
| 
 | ||||
|     if actor is not None: | ||||
|         log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||
|         log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n")  # type: ignore | ||||
| 
 | ||||
|         pdb.set_trace( | ||||
|             # start 2 levels up in user code | ||||
|  | @ -285,8 +358,8 @@ def _set_trace(actor=None): | |||
| 
 | ||||
|     else: | ||||
|         # we entered the global ``breakpoint()`` built-in from sync code | ||||
|         global _in_debug, _pdb_release_hook | ||||
|         _in_debug = 'sync' | ||||
|         global _local_task_in_debug, _pdb_release_hook | ||||
|         _local_task_in_debug = 'sync' | ||||
| 
 | ||||
|         def nuttin(): | ||||
|             pass | ||||
|  |  | |||
|  | @ -1,7 +1,7 @@ | |||
| """ | ||||
| Our classy exception set. | ||||
| """ | ||||
| from typing import Dict, Any | ||||
| from typing import Dict, Any, Optional, Type | ||||
| import importlib | ||||
| import builtins | ||||
| import traceback | ||||
|  | @ -15,17 +15,16 @@ _this_mod = importlib.import_module(__name__) | |||
| class RemoteActorError(Exception): | ||||
|     # TODO: local recontruction of remote exception deats | ||||
|     "Remote actor exception bundled locally" | ||||
|     def __init__(self, message, type_str, **msgdata) -> None: | ||||
|         super().__init__(message) | ||||
|         for ns in [builtins, _this_mod, trio]: | ||||
|             try: | ||||
|                 self.type = getattr(ns, type_str) | ||||
|                 break | ||||
|             except AttributeError: | ||||
|                 continue | ||||
|         else: | ||||
|             self.type = Exception | ||||
|     def __init__( | ||||
|         self, | ||||
|         message: str, | ||||
|         suberror_type: Optional[Type[BaseException]] = None, | ||||
|         **msgdata | ||||
| 
 | ||||
|     ) -> None: | ||||
|         super().__init__(message) | ||||
| 
 | ||||
|         self.type = suberror_type | ||||
|         self.msgdata = msgdata | ||||
| 
 | ||||
|     # TODO: a trio.MultiError.catch like context manager | ||||
|  | @ -41,6 +40,9 @@ class InternalActorError(RemoteActorError): | |||
| class TransportClosed(trio.ClosedResourceError): | ||||
|     "Underlying channel transport was closed prior to use" | ||||
| 
 | ||||
| class ContextCancelled(RemoteActorError): | ||||
|     "Inter-actor task context cancelled itself on the callee side." | ||||
| 
 | ||||
| 
 | ||||
| class NoResult(RuntimeError): | ||||
|     "No final result is expected for this actor" | ||||
|  | @ -77,12 +79,35 @@ def unpack_error( | |||
|     into a local ``RemoteActorError``. | ||||
| 
 | ||||
|     """ | ||||
|     tb_str = msg['error'].get('tb_str', '') | ||||
|     return err_type( | ||||
|         f"{chan.uid}\n" + tb_str, | ||||
|     error = msg['error'] | ||||
| 
 | ||||
|     tb_str = error.get('tb_str', '') | ||||
|     message = f"{chan.uid}\n" + tb_str | ||||
|     type_name = error['type_str'] | ||||
|     suberror_type: Type[BaseException] = Exception | ||||
| 
 | ||||
|     if type_name == 'ContextCancelled': | ||||
|         err_type = ContextCancelled | ||||
|         suberror_type = trio.Cancelled | ||||
| 
 | ||||
|     else:  # try to lookup a suitable local error type | ||||
|         for ns in [builtins, _this_mod, trio]: | ||||
|             try: | ||||
|                 suberror_type = getattr(ns, type_name) | ||||
|                 break | ||||
|             except AttributeError: | ||||
|                 continue | ||||
| 
 | ||||
|     exc = err_type( | ||||
|         message, | ||||
|         suberror_type=suberror_type, | ||||
| 
 | ||||
|         # unpack other fields into error type init | ||||
|         **msg['error'], | ||||
|     ) | ||||
| 
 | ||||
|     return exc | ||||
| 
 | ||||
| 
 | ||||
| def is_multi_cancelled(exc: BaseException) -> bool: | ||||
|     """Predicate to determine if a ``trio.MultiError`` contains only | ||||
|  |  | |||
|  | @ -17,7 +17,12 @@ from async_generator import asynccontextmanager | |||
| from ._state import current_actor | ||||
| from ._ipc import Channel | ||||
| from .log import get_logger | ||||
| from ._exceptions import unpack_error, NoResult, RemoteActorError | ||||
| from ._exceptions import ( | ||||
|     unpack_error, | ||||
|     NoResult, | ||||
|     RemoteActorError, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from ._streaming import Context, ReceiveMsgStream | ||||
| 
 | ||||
| 
 | ||||
|  | @ -84,7 +89,7 @@ class Portal: | |||
|         ns: str, | ||||
|         func: str, | ||||
|         kwargs, | ||||
|     ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: | ||||
|     ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: | ||||
|         """Submit a function to be scheduled and run by actor, return the | ||||
|         associated caller id, response queue, response type str, | ||||
|         first message packet as a tuple. | ||||
|  | @ -312,13 +317,23 @@ class Portal: | |||
| 
 | ||||
|         ctx = Context(self.channel, cid, _portal=self) | ||||
|         try: | ||||
|             async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: | ||||
|             # deliver receive only stream | ||||
|             async with ReceiveMsgStream(ctx, recv_chan) as rchan: | ||||
|                 self._streams.add(rchan) | ||||
|                 yield rchan | ||||
| 
 | ||||
|         finally: | ||||
| 
 | ||||
|             # cancel the far end task on consumer close | ||||
|             # NOTE: this is a special case since we assume that if using | ||||
|             # this ``.open_fream_from()`` api, the stream is one a one | ||||
|             # time use and we couple the far end tasks's lifetime to | ||||
|             # the consumer's scope; we don't ever send a `'stop'` | ||||
|             # message right now since there shouldn't be a reason to | ||||
|             # stop and restart the stream, right? | ||||
|             try: | ||||
|                 await ctx.cancel() | ||||
| 
 | ||||
|             except trio.ClosedResourceError: | ||||
|                 # if the far end terminates before we send a cancel the | ||||
|                 # underlying transport-channel may already be closed. | ||||
|  | @ -326,17 +341,92 @@ class Portal: | |||
| 
 | ||||
|             self._streams.remove(rchan) | ||||
| 
 | ||||
|     # @asynccontextmanager | ||||
|     # async def open_context( | ||||
|     #     self, | ||||
|     #     func: Callable, | ||||
|     #     **kwargs, | ||||
|     # ) -> Context: | ||||
|     #     # TODO | ||||
|     #     elif resptype == 'context':  # context manager style setup/teardown | ||||
|     #         # TODO likely not here though | ||||
|     #         raise NotImplementedError | ||||
|     @asynccontextmanager | ||||
|     async def open_context( | ||||
| 
 | ||||
|         self, | ||||
|         func: Callable, | ||||
|         cancel_on_exit: bool = False, | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[Tuple[Context, Any], None]: | ||||
|         '''Open an inter-actor task context. | ||||
| 
 | ||||
|         This is a synchronous API which allows for deterministic | ||||
|         setup/teardown of a remote task. The yielded ``Context`` further | ||||
|         allows for opening bidirectional streams, explicit cancellation | ||||
|         and synchronized final result collection. See ``tractor.Context``. | ||||
| 
 | ||||
|         ''' | ||||
|         # conduct target func method structural checks | ||||
|         if not inspect.iscoroutinefunction(func) and ( | ||||
|             getattr(func, '_tractor_contex_function', False) | ||||
|         ): | ||||
|             raise TypeError( | ||||
|                 f'{func} must be an async generator function!') | ||||
| 
 | ||||
|         fn_mod_path, fn_name = func_deats(func) | ||||
| 
 | ||||
|         recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||
| 
 | ||||
|         try: | ||||
|             cid, recv_chan, functype, first_msg = await self._submit( | ||||
|                 fn_mod_path, fn_name, kwargs) | ||||
| 
 | ||||
|             assert functype == 'context' | ||||
|             msg = await recv_chan.receive() | ||||
| 
 | ||||
|             try: | ||||
|                 # the "first" value here is delivered by the callee's | ||||
|                 # ``Context.started()`` call. | ||||
|                 first = msg['started'] | ||||
| 
 | ||||
|             except KeyError: | ||||
|                 assert msg.get('cid'), ("Received internal error at context?") | ||||
| 
 | ||||
|                 if msg.get('error'): | ||||
|                     # raise the error message | ||||
|                     raise unpack_error(msg, self.channel) | ||||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|             # deliver context instance and .started() msg value in open | ||||
|             # tuple. | ||||
|             ctx = Context( | ||||
|                 self.channel, | ||||
|                 cid, | ||||
|                 _portal=self, | ||||
|                 _recv_chan=recv_chan, | ||||
|             ) | ||||
| 
 | ||||
|             try: | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|                 if cancel_on_exit: | ||||
|                     await ctx.cancel() | ||||
| 
 | ||||
|                 else: | ||||
|                     if not ctx._cancel_called: | ||||
|                         await ctx.result() | ||||
| 
 | ||||
|             except ContextCancelled: | ||||
|                 # if the context was cancelled by client code | ||||
|                 # then we don't need to raise since user code | ||||
|                 # is expecting this. | ||||
|                 if not ctx._cancel_called: | ||||
|                     raise | ||||
| 
 | ||||
|             except BaseException: | ||||
|                 # the context cancels itself on any deviation | ||||
|                 await ctx.cancel() | ||||
|                 raise | ||||
| 
 | ||||
|             finally: | ||||
|                 log.info(f'Context for {func.__name__} completed') | ||||
| 
 | ||||
|         finally: | ||||
|             if recv_chan is not None: | ||||
|                 await recv_chan.aclose() | ||||
| 
 | ||||
| @dataclass | ||||
| class LocalPortal: | ||||
|  |  | |||
|  | @ -1,38 +1,311 @@ | |||
| """ | ||||
| Message stream types and APIs. | ||||
| 
 | ||||
| """ | ||||
| import inspect | ||||
| from contextlib import contextmanager  # , asynccontextmanager | ||||
| from contextlib import contextmanager, asynccontextmanager | ||||
| from dataclasses import dataclass | ||||
| from typing import Any, Iterator, Optional | ||||
| from typing import ( | ||||
|     Any, Iterator, Optional, Callable, | ||||
|     AsyncGenerator, | ||||
| ) | ||||
| 
 | ||||
| import warnings | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| from ._ipc import Channel | ||||
| from ._exceptions import unpack_error | ||||
| from ._state import current_actor | ||||
| from .log import get_logger | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass(frozen=True) | ||||
| class Context: | ||||
|     """An IAC (inter-actor communication) context. | ||||
| # TODO: generic typing like trio's receive channel | ||||
| # but with msgspec messages? | ||||
| # class ReceiveChannel(AsyncResource, Generic[ReceiveType]): | ||||
| 
 | ||||
|     Allows maintaining task or protocol specific state between communicating | ||||
|     actors. A unique context is created on the receiving end for every request | ||||
|     to a remote actor. | ||||
| 
 | ||||
|     A context can be cancelled and (eventually) restarted from | ||||
|     either side of the underlying IPC channel. | ||||
| class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||
|     """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with | ||||
|     special behaviour for signalling stream termination across an | ||||
|     inter-actor ``Channel``. This is the type returned to a local task | ||||
|     which invoked a remote streaming function using `Portal.run()`. | ||||
| 
 | ||||
|     A context can be used to open task oriented message streams. | ||||
|     Termination rules: | ||||
| 
 | ||||
|     - if the local task signals stop iteration a cancel signal is | ||||
|       relayed to the remote task indicating to stop streaming | ||||
|     - if the remote task signals the end of a stream, raise | ||||
|       a ``StopAsyncIteration`` to terminate the local ``async for`` | ||||
| 
 | ||||
|     """ | ||||
|     def __init__( | ||||
|         self, | ||||
|         ctx: 'Context',  # typing: ignore # noqa | ||||
|         rx_chan: trio.abc.ReceiveChannel, | ||||
|         shield: bool = False, | ||||
|     ) -> None: | ||||
|         self._ctx = ctx | ||||
|         self._rx_chan = rx_chan | ||||
|         self._shielded = shield | ||||
| 
 | ||||
|         # flag to denote end of stream | ||||
|         self._eoc: bool = False | ||||
| 
 | ||||
|     # delegate directly to underlying mem channel | ||||
|     def receive_nowait(self): | ||||
|         msg = self._rx_chan.receive_nowait() | ||||
|         return msg['yield'] | ||||
| 
 | ||||
|     async def receive(self): | ||||
|         # see ``.aclose()`` for notes on the old behaviour prior to | ||||
|         # introducing this | ||||
|         if self._eoc: | ||||
|             raise trio.EndOfChannel | ||||
| 
 | ||||
|         try: | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
| 
 | ||||
|         except KeyError: | ||||
|             # internal error should never get here | ||||
|             assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|             # TODO: handle 2 cases with 3.10 match syntax | ||||
|             # - 'stop' | ||||
|             # - 'error' | ||||
|             # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|             if msg.get('stop'): | ||||
|                 log.debug(f"{self} was stopped at remote end") | ||||
| 
 | ||||
|                 # # when the send is closed we assume the stream has | ||||
|                 # # terminated and signal this local iterator to stop | ||||
|                 # await self.aclose() | ||||
| 
 | ||||
|                 # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|                 # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|                 # block below it will trigger ``.aclose()``. | ||||
|                 raise trio.EndOfChannel | ||||
| 
 | ||||
|             # TODO: test that shows stream raising an expected error!!! | ||||
|             elif msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self._ctx.chan) | ||||
| 
 | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         except ( | ||||
|             trio.ClosedResourceError,  # by self._rx_chan | ||||
|             trio.EndOfChannel,  # by self._rx_chan or `stop` msg from far end | ||||
|             trio.Cancelled,  # by local cancellation | ||||
|         ): | ||||
|             # XXX: we close the stream on any of these error conditions: | ||||
| 
 | ||||
|             # a ``ClosedResourceError`` indicates that the internal | ||||
|             # feeder memory receive channel was closed likely by the | ||||
|             # runtime after the associated transport-channel | ||||
|             # disconnected or broke. | ||||
| 
 | ||||
|             # an ``EndOfChannel`` indicates either the internal recv | ||||
|             # memchan exhausted **or** we raisesd it just above after | ||||
|             # receiving a `stop` message from the far end of the stream. | ||||
| 
 | ||||
|             # Previously this was triggered by calling ``.aclose()`` on | ||||
|             # the send side of the channel inside | ||||
|             # ``Actor._push_result()`` (should still be commented code | ||||
|             # there - which should eventually get removed), but now the | ||||
|             # 'stop' message handling has been put just above. | ||||
| 
 | ||||
|             # TODO: Locally, we want to close this stream gracefully, by | ||||
|             # terminating any local consumers tasks deterministically. | ||||
|             # One we have broadcast support, we **don't** want to be | ||||
|             # closing this stream and not flushing a final value to | ||||
|             # remaining (clone) consumers who may not have been | ||||
|             # scheduled to receive it yet. | ||||
| 
 | ||||
|             # when the send is closed we assume the stream has | ||||
|             # terminated and signal this local iterator to stop | ||||
|             await self.aclose() | ||||
| 
 | ||||
|             raise  # propagate | ||||
| 
 | ||||
|         # except trio.Cancelled: | ||||
|         #     # relay cancels to the remote task | ||||
|         #     await self.aclose() | ||||
|         #     raise | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def shield( | ||||
|         self | ||||
|     ) -> Iterator['ReceiveMsgStream']:  # noqa | ||||
|         """Shield this stream's underlying channel such that a local consumer task | ||||
|         can be cancelled (and possibly restarted) using ``trio.Cancelled``. | ||||
| 
 | ||||
|         Note that here, "shielding" here guards against relaying | ||||
|         a ``'stop'`` message to the far end of the stream thus keeping | ||||
|         the stream machinery active and ready for further use, it does | ||||
|         not have anything to do with an internal ``trio.CancelScope``. | ||||
| 
 | ||||
|         """ | ||||
|         self._shielded = True | ||||
|         yield self | ||||
|         self._shielded = False | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         """Cancel associated remote actor task and local memory channel | ||||
|         on close. | ||||
| 
 | ||||
|         """ | ||||
|         # XXX: keep proper adherance to trio's `.aclose()` semantics: | ||||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: | ||||
|             log.warning(f"{self} is already closed") | ||||
| 
 | ||||
|             # this stream has already been closed so silently succeed as | ||||
|             # per ``trio.AsyncResource`` semantics. | ||||
|             # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|             return | ||||
| 
 | ||||
|         # TODO: broadcasting to multiple consumers | ||||
|         # stats = rx_chan.statistics() | ||||
|         # if stats.open_receive_channels > 1: | ||||
|         #     # if we've been cloned don't kill the stream | ||||
|         #     log.debug( | ||||
|         #       "there are still consumers running keeping stream alive") | ||||
|         #     return | ||||
| 
 | ||||
|         if self._shielded: | ||||
|             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||
|             return | ||||
| 
 | ||||
|         # XXX: This must be set **AFTER** the shielded test above! | ||||
|         self._eoc = True | ||||
| 
 | ||||
|         # NOTE: this is super subtle IPC messaging stuff: | ||||
|         # Relay stop iteration to far end **iff** we're | ||||
|         # in bidirectional mode. If we're only streaming | ||||
|         # *from* one side then that side **won't** have an | ||||
|         # entry in `Actor._cids2qs` (maybe it should though?). | ||||
|         # So any `yield` or `stop` msgs sent from the caller side | ||||
|         # will cause key errors on the callee side since there is | ||||
|         # no entry for a local feeder mem chan since the callee task | ||||
|         # isn't expecting messages to be sent by the caller. | ||||
|         # Thus, we must check that this context DOES NOT | ||||
|         # have a portal reference to ensure this is indeed the callee | ||||
|         # side and can relay a 'stop'. | ||||
| 
 | ||||
|         # In the bidirectional case, `Context.open_stream()` will create | ||||
|         # the `Actor._cids2qs` entry from a call to | ||||
|         # `Actor.get_memchans()` and will send the stop message in | ||||
|         # ``__aexit__()`` on teardown so it **does not** need to be | ||||
|         # called here. | ||||
|         if not self._ctx._portal: | ||||
|             try: | ||||
|                 # only for 2 way streams can we can send | ||||
|                 # stop from the caller side | ||||
|                 await self._ctx.send_stop() | ||||
| 
 | ||||
|             except trio.BrokenResourceError: | ||||
|                 # the underlying channel may already have been pulled | ||||
|                 # in which case our stop message is meaningless since | ||||
|                 # it can't traverse the transport. | ||||
|                 log.debug(f'Channel for {self} was already closed') | ||||
| 
 | ||||
|         # close the local mem chan ``self._rx_chan`` ??!? | ||||
| 
 | ||||
|         # DEFINITELY NOT if we're a bi-dir ``MsgStream``! | ||||
|         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver | ||||
|         # the potential final result from the surrounding inter-actor | ||||
|         # `Context` so we don't want to close it until that context has | ||||
|         # run to completion. | ||||
| 
 | ||||
|         # XXX: Notes on old behaviour: | ||||
|         # await rx_chan.aclose() | ||||
| 
 | ||||
|         # In the receive-only case, ``Portal.open_stream_from()`` used | ||||
|         # to rely on this call explicitly on teardown such that a new | ||||
|         # call to ``.receive()`` after ``rx_chan`` had been closed, would | ||||
|         # result in us raising a ``trio.EndOfChannel`` (since we | ||||
|         # remapped the ``trio.ClosedResourceError`). However, now if for some | ||||
|         # reason the stream's consumer code tries to manually receive a new | ||||
|         # value before ``.aclose()`` is called **but** the far end has | ||||
|         # stopped `.receive()` **must** raise ``trio.EndofChannel`` in | ||||
|         # order to avoid an infinite hang on ``.__anext__()``; this is | ||||
|         # why we added ``self._eoc`` to denote stream closure indepedent | ||||
|         # of ``rx_chan``. | ||||
| 
 | ||||
|         # In theory we could still use this old method and close the | ||||
|         # underlying msg-loop mem chan as above and then **not** check | ||||
|         # for ``self._eoc`` in ``.receive()`` (if for some reason we | ||||
|         # think that check is a bottle neck - not likely) **but** then | ||||
|         # we would need to map the resulting | ||||
|         # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in | ||||
|         # ``.receive()`` (as it originally was before bi-dir streaming | ||||
|         # support) in order to trigger stream closure. The old behaviour | ||||
|         # is arguably more confusing since we lose detection of the | ||||
|         # runtime's closure of ``rx_chan`` in the case where we may | ||||
|         # still need to consume msgs that are "in transit" from the far | ||||
|         # end (eg. for ``Context.result()``). | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     # def clone(self): | ||||
|     #     """Clone this receive channel allowing for multi-task | ||||
|     #     consumption from the same channel. | ||||
| 
 | ||||
|     #     """ | ||||
|     #     return ReceiveStream( | ||||
|     #         self._cid, | ||||
|     #         self._rx_chan.clone(), | ||||
|     #         self._portal, | ||||
|     #     ) | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||
|     """ | ||||
|     Bidirectional message stream for use within an inter-actor actor | ||||
|     ``Context```. | ||||
| 
 | ||||
|     """ | ||||
|     async def send( | ||||
|         self, | ||||
|         data: Any | ||||
|     ) -> None: | ||||
|         '''Send a message over this stream to the far end. | ||||
| 
 | ||||
|         ''' | ||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class Context: | ||||
|     '''An inter-actor task communication context. | ||||
| 
 | ||||
|     Allows maintaining task or protocol specific state between | ||||
|     2 communicating actor tasks. A unique context is created on the | ||||
|     callee side/end for every request to a remote actor from a portal. | ||||
| 
 | ||||
|     A context can be cancelled and (possibly eventually restarted) from | ||||
|     either side of the underlying IPC channel. | ||||
| 
 | ||||
|     A context can be used to open task oriented message streams and can | ||||
|     be thought of as an IPC aware inter-actor cancel scope. | ||||
| 
 | ||||
|     ''' | ||||
|     chan: Channel | ||||
|     cid: str | ||||
| 
 | ||||
|     # only set on the caller side | ||||
|     _portal: Optional['Portal'] = None    # type: ignore # noqa | ||||
|     _recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||
|     _result: Optional[Any] = False | ||||
|     _cancel_called: bool = False | ||||
| 
 | ||||
|     # only set on the callee side | ||||
|     _cancel_scope: Optional[trio.CancelScope] = None | ||||
|  | @ -51,52 +324,196 @@ class Context: | |||
|         await self.chan.send({'stop': True, 'cid': self.cid}) | ||||
| 
 | ||||
|     async def cancel(self) -> None: | ||||
|         """Cancel this inter-actor-task context. | ||||
|         '''Cancel this inter-actor-task context. | ||||
| 
 | ||||
|         Request that the far side cancel it's current linked context, | ||||
|         timeout quickly to sidestep 2-generals... | ||||
|         Timeout quickly in an attempt to sidestep 2-generals... | ||||
| 
 | ||||
|         """ | ||||
|         assert self._portal, ( | ||||
|             "No portal found, this is likely a callee side context") | ||||
|         ''' | ||||
|         self._cancel_called = True | ||||
| 
 | ||||
|         cid = self.cid | ||||
|         with trio.move_on_after(0.5) as cs: | ||||
|             cs.shield = True | ||||
|             log.warning( | ||||
|                 f"Cancelling stream {cid} to " | ||||
|                 f"{self._portal.channel.uid}") | ||||
|         if self._portal:  # caller side: | ||||
|             if not self._portal: | ||||
|                 raise RuntimeError( | ||||
|                     "No portal found, this is likely a callee side context" | ||||
|                 ) | ||||
| 
 | ||||
|             # NOTE: we're telling the far end actor to cancel a task | ||||
|             # corresponding to *this actor*. The far end local channel | ||||
|             # instance is passed to `Actor._cancel_task()` implicitly. | ||||
|             await self._portal.run_from_ns('self', '_cancel_task', cid=cid) | ||||
| 
 | ||||
|         if cs.cancelled_caught: | ||||
|             # XXX: there's no way to know if the remote task was indeed | ||||
|             # cancelled in the case where the connection is broken or | ||||
|             # some other network error occurred. | ||||
|             if not self._portal.channel.connected(): | ||||
|             cid = self.cid | ||||
|             with trio.move_on_after(0.5) as cs: | ||||
|                 cs.shield = True | ||||
|                 log.warning( | ||||
|                     "May have failed to cancel remote task " | ||||
|                     f"{cid} for {self._portal.channel.uid}") | ||||
|                     f"Cancelling stream {cid} to " | ||||
|                     f"{self._portal.channel.uid}") | ||||
| 
 | ||||
|                 # NOTE: we're telling the far end actor to cancel a task | ||||
|                 # corresponding to *this actor*. The far end local channel | ||||
|                 # instance is passed to `Actor._cancel_task()` implicitly. | ||||
|                 await self._portal.run_from_ns('self', '_cancel_task', cid=cid) | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
|                 # XXX: there's no way to know if the remote task was indeed | ||||
|                 # cancelled in the case where the connection is broken or | ||||
|                 # some other network error occurred. | ||||
|                 # if not self._portal.channel.connected(): | ||||
|                 if not self.chan.connected(): | ||||
|                     log.warning( | ||||
|                         "May have failed to cancel remote task " | ||||
|                         f"{cid} for {self._portal.channel.uid}") | ||||
|         else: | ||||
|             # ensure callee side | ||||
|             assert self._cancel_scope | ||||
|             # TODO: should we have an explicit cancel message | ||||
|             # or is relaying the local `trio.Cancelled` as an | ||||
|             # {'error': trio.Cancelled, cid: "blah"} enough? | ||||
|             # This probably gets into the discussion in | ||||
|             # https://github.com/goodboy/tractor/issues/36 | ||||
|             self._cancel_scope.cancel() | ||||
| 
 | ||||
|     @asynccontextmanager | ||||
|     async def open_stream( | ||||
| 
 | ||||
|         self, | ||||
|         shield: bool = False, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[MsgStream, None]: | ||||
|         '''Open a ``MsgStream``, a bi-directional stream connected to the | ||||
|         cross-actor (far end) task for this ``Context``. | ||||
| 
 | ||||
|         This context manager must be entered on both the caller and | ||||
|         callee for the stream to logically be considered "connected". | ||||
| 
 | ||||
|         A ``MsgStream`` is currently "one-shot" use, meaning if you | ||||
|         close it you can not "re-open" it for streaming and instead you | ||||
|         must re-establish a new surrounding ``Context`` using | ||||
|         ``Portal.open_context()``.  In the future this may change but | ||||
|         currently there seems to be no obvious reason to support | ||||
|         "re-opening": | ||||
|             - pausing a stream can be done with a message. | ||||
|             - task errors will normally require a restart of the entire | ||||
|               scope of the inter-actor task context due to the nature of | ||||
|               ``trio``'s cancellation system. | ||||
| 
 | ||||
|         ''' | ||||
|         actor = current_actor() | ||||
| 
 | ||||
|         # here we create a mem chan that corresponds to the | ||||
|         # far end caller / callee. | ||||
| 
 | ||||
|         # NOTE: in one way streaming this only happens on the | ||||
|         # caller side inside `Actor.send_cmd()` so if you try | ||||
|         # to send a stop from the caller to the callee in the | ||||
|         # single-direction-stream case you'll get a lookup error | ||||
|         # currently. | ||||
|         _, recv_chan = actor.get_memchans( | ||||
|             self.chan.uid, | ||||
|             self.cid | ||||
|         ) | ||||
| 
 | ||||
|         # XXX: If the underlying receive mem chan has been closed then | ||||
|         # likely client code has already exited a ``.open_stream()`` | ||||
|         # block prior. we error here until such a time that we decide | ||||
|         # allowing streams to be "re-connected" is supported and/or | ||||
|         # a good idea. | ||||
|         if recv_chan._closed: | ||||
|             task = trio.lowlevel.current_task().name | ||||
|             raise trio.ClosedResourceError( | ||||
|                 f'stream for {actor.uid[0]}:{task} has already been closed.' | ||||
|                 '\nRe-opening a closed stream is not yet supported!' | ||||
|                 '\nConsider re-calling the containing `@tractor.context` func' | ||||
|             ) | ||||
| 
 | ||||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=recv_chan, | ||||
|             shield=shield, | ||||
|         ) as rchan: | ||||
| 
 | ||||
|             if self._portal: | ||||
|                 self._portal._streams.add(rchan) | ||||
| 
 | ||||
|             try: | ||||
|                 # ensure we aren't cancelled before delivering | ||||
|                 # the stream | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield rchan | ||||
| 
 | ||||
|             except trio.EndOfChannel: | ||||
|                 # likely the far end sent us a 'stop' message to | ||||
|                 # terminate the stream. | ||||
|                 raise | ||||
| 
 | ||||
|             else: | ||||
|                 # XXX: Make the stream "one-shot use".  On exit, signal | ||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the | ||||
|                 # far end. | ||||
|                 await self.send_stop() | ||||
| 
 | ||||
|             finally: | ||||
|                 if self._portal: | ||||
|                     self._portal._streams.remove(rchan) | ||||
| 
 | ||||
|     async def result(self) -> Any: | ||||
|         '''From a caller side, wait for and return the final result from | ||||
|         the callee side task. | ||||
| 
 | ||||
|         ''' | ||||
|         assert self._portal, "Context.result() can not be called from callee!" | ||||
|         assert self._recv_chan | ||||
| 
 | ||||
|         if self._result is False: | ||||
| 
 | ||||
|             if not self._recv_chan._closed:  # type: ignore | ||||
| 
 | ||||
|                 # wait for a final context result consuming | ||||
|                 # and discarding any bi dir stream msgs still | ||||
|                 # in transit from the far end. | ||||
|                 while True: | ||||
| 
 | ||||
|                     msg = await self._recv_chan.receive() | ||||
|                     try: | ||||
|                         self._result = msg['return'] | ||||
|                         break | ||||
|                     except KeyError: | ||||
| 
 | ||||
|                         if 'yield' in msg: | ||||
|                             # far end task is still streaming to us.. | ||||
|                             log.warning(f'Remote stream deliverd {msg}') | ||||
|                             # do disard | ||||
|                             continue | ||||
| 
 | ||||
|                         elif 'stop' in msg: | ||||
|                             log.debug('Remote stream terminated') | ||||
|                             continue | ||||
| 
 | ||||
|                         # internal error should never get here | ||||
|                         assert msg.get('cid'), ( | ||||
|                             "Received internal error at portal?") | ||||
|                         raise unpack_error(msg, self._portal.channel) | ||||
| 
 | ||||
|         return self._result | ||||
| 
 | ||||
|     async def started(self, value: Optional[Any] = None) -> None: | ||||
| 
 | ||||
|         if self._portal: | ||||
|             raise RuntimeError( | ||||
|                 f"Caller side context {self} can not call started!") | ||||
| 
 | ||||
|         await self.chan.send({'started': value, 'cid': self.cid}) | ||||
| 
 | ||||
|     # TODO: do we need a restart api? | ||||
|     # async def restart(self) -> None: | ||||
|     #     # TODO | ||||
|     #     pass | ||||
| 
 | ||||
|     # @asynccontextmanager | ||||
|     # async def open_stream( | ||||
|     #     self, | ||||
|     # ) -> AsyncContextManager: | ||||
|     #     # TODO | ||||
|     #     pass | ||||
| 
 | ||||
| 
 | ||||
| def stream(func): | ||||
| def stream(func: Callable) -> Callable: | ||||
|     """Mark an async function as a streaming routine with ``@stream``. | ||||
| 
 | ||||
|     """ | ||||
|     func._tractor_stream_function = True | ||||
|     # annotate | ||||
|     # TODO: apply whatever solution ``mypy`` ends up picking for this: | ||||
|     # https://github.com/python/mypy/issues/2087#issuecomment-769266912 | ||||
|     func._tractor_stream_function = True  # type: ignore | ||||
| 
 | ||||
|     sig = inspect.signature(func) | ||||
|     params = sig.parameters | ||||
|     if 'stream' not in params and 'ctx' in params: | ||||
|  | @ -114,147 +531,26 @@ def stream(func): | |||
|     ): | ||||
|         raise TypeError( | ||||
|             "The first argument to the stream function " | ||||
|             f"{func.__name__} must be `ctx: tractor.Context`" | ||||
|             f"{func.__name__} must be `ctx: tractor.Context` " | ||||
|             "(Or ``to_trio`` if using ``asyncio`` in guest mode)." | ||||
|         ) | ||||
|     return func | ||||
| 
 | ||||
| 
 | ||||
| class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||
|     """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with | ||||
|     special behaviour for signalling stream termination across an | ||||
|     inter-actor ``Channel``. This is the type returned to a local task | ||||
|     which invoked a remote streaming function using `Portal.run()`. | ||||
| 
 | ||||
|     Termination rules: | ||||
|     - if the local task signals stop iteration a cancel signal is | ||||
|       relayed to the remote task indicating to stop streaming | ||||
|     - if the remote task signals the end of a stream, raise a | ||||
|       ``StopAsyncIteration`` to terminate the local ``async for`` | ||||
| def context(func: Callable) -> Callable: | ||||
|     """Mark an async function as a streaming routine with ``@context``. | ||||
| 
 | ||||
|     """ | ||||
|     def __init__( | ||||
|         self, | ||||
|         ctx: Context, | ||||
|         rx_chan: trio.abc.ReceiveChannel, | ||||
|         portal: 'Portal',  # type: ignore # noqa | ||||
|     ) -> None: | ||||
|         self._ctx = ctx | ||||
|         self._rx_chan = rx_chan | ||||
|         self._portal = portal | ||||
|         self._shielded = False | ||||
|     # annotate | ||||
|     # TODO: apply whatever solution ``mypy`` ends up picking for this: | ||||
|     # https://github.com/python/mypy/issues/2087#issuecomment-769266912 | ||||
|     func._tractor_context_function = True  # type: ignore | ||||
| 
 | ||||
|     # delegate directly to underlying mem channel | ||||
|     def receive_nowait(self): | ||||
|         return self._rx_chan.receive_nowait() | ||||
| 
 | ||||
|     async def receive(self): | ||||
|         try: | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
| 
 | ||||
|         except KeyError: | ||||
|             # internal error should never get here | ||||
|             assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|             # TODO: handle 2 cases with 3.10 match syntax | ||||
|             # - 'stop' | ||||
|             # - 'error' | ||||
|             # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|             # TODO: test that shows stream raising an expected error!!! | ||||
|             if msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self._portal.channel) | ||||
| 
 | ||||
|         except (trio.ClosedResourceError, StopAsyncIteration): | ||||
|             # XXX: this indicates that a `stop` message was | ||||
|             # sent by the far side of the underlying channel. | ||||
|             # Currently this is triggered by calling ``.aclose()`` on | ||||
|             # the send side of the channel inside | ||||
|             # ``Actor._push_result()``, but maybe it should be put here? | ||||
|             # to avoid exposing the internal mem chan closing mechanism? | ||||
|             # in theory we could instead do some flushing of the channel | ||||
|             # if needed to ensure all consumers are complete before | ||||
|             # triggering closure too early? | ||||
| 
 | ||||
|             # Locally, we want to close this stream gracefully, by | ||||
|             # terminating any local consumers tasks deterministically. | ||||
|             # We **don't** want to be closing this send channel and not | ||||
|             # relaying a final value to remaining consumers who may not | ||||
|             # have been scheduled to receive it yet? | ||||
| 
 | ||||
|             # lots of testing to do here | ||||
| 
 | ||||
|             # when the send is closed we assume the stream has | ||||
|             # terminated and signal this local iterator to stop | ||||
|             await self.aclose() | ||||
|             raise StopAsyncIteration | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             # relay cancels to the remote task | ||||
|             await self.aclose() | ||||
|             raise | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def shield( | ||||
|         self | ||||
|     ) -> Iterator['ReceiveMsgStream']:  # noqa | ||||
|         """Shield this stream's underlying channel such that a local consumer task | ||||
|         can be cancelled (and possibly restarted) using ``trio.Cancelled``. | ||||
| 
 | ||||
|         """ | ||||
|         self._shielded = True | ||||
|         yield self | ||||
|         self._shielded = False | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         """Cancel associated remote actor task and local memory channel | ||||
|         on close. | ||||
|         """ | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: | ||||
|             log.warning(f"{self} is already closed") | ||||
|             return | ||||
| 
 | ||||
|         # stats = rx_chan.statistics() | ||||
|         # if stats.open_receive_channels > 1: | ||||
|         #     # if we've been cloned don't kill the stream | ||||
|         #     log.debug( | ||||
|         #       "there are still consumers running keeping stream alive") | ||||
|         #     return | ||||
| 
 | ||||
|         if self._shielded: | ||||
|             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||
|             return | ||||
| 
 | ||||
|         # close the local mem chan | ||||
|         rx_chan.close() | ||||
| 
 | ||||
|         # cancel surrounding IPC context | ||||
|         await self._ctx.cancel() | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     # def clone(self): | ||||
|     #     """Clone this receive channel allowing for multi-task | ||||
|     #     consumption from the same channel. | ||||
| 
 | ||||
|     #     """ | ||||
|     #     return ReceiveStream( | ||||
|     #         self._cid, | ||||
|     #         self._rx_chan.clone(), | ||||
|     #         self._portal, | ||||
|     #     ) | ||||
| 
 | ||||
| 
 | ||||
| # class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||
| #     """ | ||||
| #     Bidirectional message stream for use within an inter-actor actor | ||||
| #     ``Context```. | ||||
| 
 | ||||
| #     """ | ||||
| #     async def send( | ||||
| #         self, | ||||
| #         data: Any | ||||
| #     ) -> None: | ||||
| #         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
|     sig = inspect.signature(func) | ||||
|     params = sig.parameters | ||||
|     if 'ctx' not in params: | ||||
|         raise TypeError( | ||||
|             "The first argument to the context function " | ||||
|             f"{func.__name__} must be `ctx: tractor.Context`" | ||||
|         ) | ||||
|     return func | ||||
|  |  | |||
|  | @ -357,7 +357,8 @@ async def open_nursery( | |||
|     try: | ||||
|         if actor is None and is_main_process(): | ||||
| 
 | ||||
|             # if we are the parent process start the actor runtime implicitly | ||||
|             # if we are the parent process start the | ||||
|             # actor runtime implicitly | ||||
|             log.info("Starting actor runtime!") | ||||
| 
 | ||||
|             # mark us for teardown on exit | ||||
|  | @ -376,7 +377,6 @@ async def open_nursery( | |||
|             async with _open_and_supervise_one_cancels_all_nursery( | ||||
|                 actor | ||||
|             ) as anursery: | ||||
| 
 | ||||
|                 yield anursery | ||||
| 
 | ||||
|     finally: | ||||
|  |  | |||
|  | @ -1,9 +1,13 @@ | |||
| """ | ||||
| Messaging pattern APIs and helpers. | ||||
| 
 | ||||
| NOTE: this module is likely deprecated by the new bi-directional streaming | ||||
| support provided by ``tractor.Context.open_stream()`` and friends. | ||||
| 
 | ||||
| """ | ||||
| import inspect | ||||
| import typing | ||||
| from typing import Dict, Any, Set, Callable | ||||
| from typing import Dict, Any, Set, Callable, List, Tuple | ||||
| from functools import partial | ||||
| from async_generator import aclosing | ||||
| 
 | ||||
|  | @ -20,7 +24,7 @@ log = get_logger('messaging') | |||
| 
 | ||||
| async def fan_out_to_ctxs( | ||||
|     pub_async_gen_func: typing.Callable,  # it's an async gen ... gd mypy | ||||
|     topics2ctxs: Dict[str, set], | ||||
|     topics2ctxs: Dict[str, list], | ||||
|     packetizer: typing.Callable = None, | ||||
| ) -> None: | ||||
|     """Request and fan out quotes to each subscribed actor channel. | ||||
|  | @ -34,24 +38,27 @@ async def fan_out_to_ctxs( | |||
| 
 | ||||
|         async for published in pub_gen: | ||||
| 
 | ||||
|             ctx_payloads: Dict[str, Any] = {} | ||||
|             ctx_payloads: List[Tuple[Context, Any]] = [] | ||||
| 
 | ||||
|             for topic, data in published.items(): | ||||
|                 log.debug(f"publishing {topic, data}") | ||||
| 
 | ||||
|                 # build a new dict packet or invoke provided packetizer | ||||
|                 if packetizer is None: | ||||
|                     packet = {topic: data} | ||||
| 
 | ||||
|                 else: | ||||
|                     packet = packetizer(topic, data) | ||||
|                 for ctx in topics2ctxs.get(topic, set()): | ||||
|                     ctx_payloads.setdefault(ctx, {}).update(packet), | ||||
| 
 | ||||
|                 for ctx in topics2ctxs.get(topic, list()): | ||||
|                     ctx_payloads.append((ctx, packet)) | ||||
| 
 | ||||
|             if not ctx_payloads: | ||||
|                 log.debug(f"Unconsumed values:\n{published}") | ||||
| 
 | ||||
|             # deliver to each subscriber (fan out) | ||||
|             if ctx_payloads: | ||||
|                 for ctx, payload in ctx_payloads.items(): | ||||
|                 for ctx, payload in ctx_payloads: | ||||
|                     try: | ||||
|                         await ctx.send_yield(payload) | ||||
|                     except ( | ||||
|  | @ -60,15 +67,24 @@ async def fan_out_to_ctxs( | |||
|                         ConnectionRefusedError, | ||||
|                     ): | ||||
|                         log.warning(f"{ctx.chan} went down?") | ||||
|                         for ctx_set in topics2ctxs.values(): | ||||
|                             ctx_set.discard(ctx) | ||||
|                         for ctx_list in topics2ctxs.values(): | ||||
|                             try: | ||||
|                                 ctx_list.remove(ctx) | ||||
|                             except ValueError: | ||||
|                                 continue | ||||
| 
 | ||||
|             if not get_topics(): | ||||
|                 log.warning(f"No subscribers left for {pub_gen}") | ||||
|                 break | ||||
| 
 | ||||
| 
 | ||||
| def modify_subs(topics2ctxs, topics, ctx): | ||||
| def modify_subs( | ||||
| 
 | ||||
|     topics2ctxs: Dict[str, List[Context]], | ||||
|     topics: Set[str], | ||||
|     ctx: Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|     """Absolute symbol subscription list for each quote stream. | ||||
| 
 | ||||
|     Effectively a symbol subscription api. | ||||
|  | @ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx): | |||
| 
 | ||||
|     # update map from each symbol to requesting client's chan | ||||
|     for topic in topics: | ||||
|         topics2ctxs.setdefault(topic, set()).add(ctx) | ||||
|         topics2ctxs.setdefault(topic, list()).append(ctx) | ||||
| 
 | ||||
|     # remove any existing symbol subscriptions if symbol is not | ||||
|     # found in ``symbols`` | ||||
|  | @ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx): | |||
|     for topic in filter( | ||||
|         lambda topic: topic not in topics, topics2ctxs.copy() | ||||
|     ): | ||||
|         ctx_set = topics2ctxs.get(topic) | ||||
|         ctx_set.discard(ctx) | ||||
|         ctx_list = topics2ctxs.get(topic) | ||||
|         if ctx_list: | ||||
|             try: | ||||
|                 ctx_list.remove(ctx) | ||||
|             except ValueError: | ||||
|                 pass | ||||
| 
 | ||||
|         if not ctx_set: | ||||
|         if not ctx_list: | ||||
|             # pop empty sets which will trigger bg quoter task termination | ||||
|             topics2ctxs.pop(topic) | ||||
| 
 | ||||
|  | @ -256,7 +276,7 @@ def pub( | |||
|                             respawn = True | ||||
|             finally: | ||||
|                 # remove all subs for this context | ||||
|                 modify_subs(topics2ctxs, (), ctx) | ||||
|                 modify_subs(topics2ctxs, set(), ctx) | ||||
| 
 | ||||
|                 # if there are truly no more subscriptions with this broker | ||||
|                 # drop from broker subs dict | ||||
|  |  | |||
|  | @ -78,7 +78,7 @@ def tractor_test(fn): | |||
| 
 | ||||
|         else: | ||||
|             # use implicit root actor start | ||||
|             main = partial(fn, *args, **kwargs), | ||||
|             main = partial(fn, *args, **kwargs) | ||||
| 
 | ||||
|         return trio.run(main) | ||||
|             # arbiter_addr=arb_addr, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue