Merge pull request #219 from goodboy/bi_streaming_no_debugger_stuff
Initial bi-directional streaming support!wats_da_nooz
						commit
						54d8c93f1b
					
				|  | @ -6,15 +6,19 @@ jobs: | |||
|   mypy: | ||||
|     name: 'MyPy' | ||||
|     runs-on: ubuntu-latest | ||||
| 
 | ||||
|     steps: | ||||
|       - name: Checkout | ||||
|         uses: actions/checkout@v2 | ||||
| 
 | ||||
|       - name: Setup python | ||||
|         uses: actions/setup-python@v2 | ||||
|         with: | ||||
|           python-version: '3.8' | ||||
|           python-version: '3.9' | ||||
| 
 | ||||
|       - name: Install dependencies | ||||
|         run: pip install -U . --upgrade-strategy eager | ||||
|         run: pip install -U . --upgrade-strategy eager -r requirements-test.txt | ||||
| 
 | ||||
|       - name: Run MyPy check | ||||
|         run: mypy tractor/ --ignore-missing-imports | ||||
| 
 | ||||
|  |  | |||
|  | @ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree | |||
|             print('This process tree will self-destruct in 1 sec...') | ||||
|             await trio.sleep(1) | ||||
| 
 | ||||
|             # you could have done this yourself | ||||
|             # raise an error in root actor/process and trigger | ||||
|             # reaping of all minions | ||||
|             raise Exception('Self Destructed') | ||||
| 
 | ||||
| 
 | ||||
|  | @ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B) | |||
| We're hoping to add a respawn-from-repl system soon! | ||||
| 
 | ||||
| 
 | ||||
| SC compatible bi-directional streaming | ||||
| -------------------------------------- | ||||
| Yes, you saw it here first; we provide 2-way streams | ||||
| with reliable, transitive setup/teardown semantics. | ||||
| 
 | ||||
| Our nascent api is remniscent of ``trio.Nursery.start()`` | ||||
| style invocation: | ||||
| 
 | ||||
| .. code:: python | ||||
| 
 | ||||
|     import trio | ||||
|     import tractor | ||||
| 
 | ||||
| 
 | ||||
|     @tractor.context | ||||
|     async def simple_rpc( | ||||
| 
 | ||||
|         ctx: tractor.Context, | ||||
|         data: int, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         '''Test a small ping-pong 2-way streaming server. | ||||
| 
 | ||||
|         ''' | ||||
|         # signal to parent that we're up much like | ||||
|         # ``trio_typing.TaskStatus.started()`` | ||||
|         await ctx.started(data + 1) | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             count = 0 | ||||
|             async for msg in stream: | ||||
| 
 | ||||
|                 assert msg == 'ping' | ||||
|                 await stream.send('pong') | ||||
|                 count += 1 | ||||
| 
 | ||||
|             else: | ||||
|                 assert count == 10 | ||||
| 
 | ||||
| 
 | ||||
|     async def main() -> None: | ||||
| 
 | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             portal = await n.start_actor( | ||||
|                 'rpc_server', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             # XXX: this syntax requires py3.9 | ||||
|             async with ( | ||||
| 
 | ||||
|                 portal.open_context( | ||||
|                     simple_rpc, | ||||
|                     data=10, | ||||
|                 ) as (ctx, sent), | ||||
| 
 | ||||
|                 ctx.open_stream() as stream, | ||||
|             ): | ||||
| 
 | ||||
|                 assert sent == 11 | ||||
| 
 | ||||
|                 count = 0 | ||||
|                 # receive msgs using async for style | ||||
|                 await stream.send('ping') | ||||
| 
 | ||||
|                 async for msg in stream: | ||||
|                     assert msg == 'pong' | ||||
|                     await stream.send('ping') | ||||
|                     count += 1 | ||||
| 
 | ||||
|                     if count >= 9: | ||||
|                         break | ||||
| 
 | ||||
| 
 | ||||
|             # explicitly teardown the daemon-actor | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
|     if __name__ == '__main__': | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| See original proposal and discussion in `#53`_ as well | ||||
| as follow up improvements in `#223`_ that we'd love to | ||||
| hear your thoughts on! | ||||
| 
 | ||||
| .. _#53: https://github.com/goodboy/tractor/issues/53 | ||||
| .. _#223: https://github.com/goodboy/tractor/issues/223 | ||||
| 
 | ||||
| 
 | ||||
| Worker poolz are easy peasy | ||||
| --------------------------- | ||||
| The initial ask from most new users is *"how do I make a worker | ||||
|  |  | |||
|  | @ -0,0 +1,72 @@ | |||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_rpc( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
| 
 | ||||
| ) -> None: | ||||
|     '''Test a small ping-pong 2-way streaming server. | ||||
| 
 | ||||
|     ''' | ||||
|     # signal to parent that we're up much like | ||||
|     # ``trio_typing.TaskStatus.started()`` | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         count = 0 | ||||
|         async for msg in stream: | ||||
| 
 | ||||
|             assert msg == 'ping' | ||||
|             await stream.send('pong') | ||||
|             count += 1 | ||||
| 
 | ||||
|         else: | ||||
|             assert count == 10 | ||||
| 
 | ||||
| 
 | ||||
| async def main() -> None: | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'rpc_server', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         # XXX: syntax requires py3.9 | ||||
|         async with ( | ||||
| 
 | ||||
|             portal.open_context( | ||||
|                 simple_rpc,  # taken from pytest parameterization | ||||
|                 data=10, | ||||
| 
 | ||||
|             ) as (ctx, sent), | ||||
| 
 | ||||
|             ctx.open_stream() as stream, | ||||
|         ): | ||||
| 
 | ||||
|             assert sent == 11 | ||||
| 
 | ||||
|             count = 0 | ||||
|             # receive msgs using async for style | ||||
|             await stream.send('ping') | ||||
| 
 | ||||
|             async for msg in stream: | ||||
|                 assert msg == 'pong' | ||||
|                 await stream.send('ping') | ||||
|                 count += 1 | ||||
| 
 | ||||
|                 if count >= 9: | ||||
|                     break | ||||
| 
 | ||||
|         # explicitly teardown the daemon-actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  | @ -0,0 +1,498 @@ | |||
| """ | ||||
| Bidirectional streaming and context API. | ||||
| 
 | ||||
| """ | ||||
| import pytest | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| from conftest import tractor_test | ||||
| 
 | ||||
| # the general stream semantics are | ||||
| # - normal termination: far end relays a stop message which | ||||
| # terminates an ongoing ``MsgStream`` iteration | ||||
| # - cancel termination: context is cancelled on either side cancelling | ||||
| #  the "linked" inter-actor task context | ||||
| 
 | ||||
| 
 | ||||
| _state: bool = False | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_setup_teardown( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
|     block_forever: bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     # startup phase | ||||
|     global _state | ||||
|     _state = True | ||||
| 
 | ||||
|     # signal to parent that we're up | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     try: | ||||
|         if block_forever: | ||||
|             # block until cancelled | ||||
|             await trio.sleep_forever() | ||||
|         else: | ||||
|             return 'yo' | ||||
|     finally: | ||||
|         _state = False | ||||
| 
 | ||||
| 
 | ||||
| async def assert_state(value: bool): | ||||
|     global _state | ||||
|     assert _state == value | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'error_parent', | ||||
|     [False, True], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'callee_blocks_forever', | ||||
|     [False, True], | ||||
| ) | ||||
| def test_simple_context( | ||||
|     error_parent, | ||||
|     callee_blocks_forever, | ||||
| ): | ||||
| 
 | ||||
|     async def main(): | ||||
| 
 | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             portal = await n.start_actor( | ||||
|                 'simple_context', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 simple_setup_teardown, | ||||
|                 data=10, | ||||
|                 block_forever=callee_blocks_forever, | ||||
|             ) as (ctx, sent): | ||||
| 
 | ||||
|                 assert sent == 11 | ||||
| 
 | ||||
|                 if callee_blocks_forever: | ||||
|                     await portal.run(assert_state, value=True) | ||||
|                     await ctx.cancel() | ||||
|                 else: | ||||
|                     assert await ctx.result() == 'yo' | ||||
| 
 | ||||
|             # after cancellation | ||||
|             await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|             if error_parent: | ||||
|                 raise ValueError | ||||
| 
 | ||||
|             # shut down daemon | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
|     if error_parent: | ||||
|         try: | ||||
|             trio.run(main) | ||||
|         except ValueError: | ||||
|             pass | ||||
|     else: | ||||
|         trio.run(main) | ||||
| 
 | ||||
| 
 | ||||
| # basic stream terminations: | ||||
| # - callee context closes without using stream | ||||
| # - caller context closes without using stream | ||||
| # - caller context calls `Context.cancel()` while streaming | ||||
| #   is ongoing resulting in callee being cancelled | ||||
| # - callee calls `Context.cancel()` while streaming and caller | ||||
| #   sees stream terminated in `RemoteActorError` | ||||
| 
 | ||||
| # TODO: future possible features | ||||
| # - restart request: far end raises `ContextRestart` | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def close_ctx_immediately( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
|     global _state | ||||
| 
 | ||||
|     async with ctx.open_stream(): | ||||
|         pass | ||||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_callee_closes_ctx_after_stream_open(): | ||||
|     'callee context closes without using stream' | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'fast_stream_closer', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             close_ctx_immediately, | ||||
| 
 | ||||
|             # flag to avoid waiting the final result | ||||
|             # cancel_on_exit=True, | ||||
| 
 | ||||
|         ) as (ctx, sent): | ||||
| 
 | ||||
|             assert sent is None | ||||
| 
 | ||||
|             with trio.fail_after(0.5): | ||||
|                 async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                     # should fall through since ``StopAsyncIteration`` | ||||
|                     # should be raised through translation of | ||||
|                     # a ``trio.EndOfChannel`` by | ||||
|                     # ``trio.abc.ReceiveChannel.__anext__()`` | ||||
|                     async for _ in stream: | ||||
|                         assert 0 | ||||
|                     else: | ||||
| 
 | ||||
|                         # verify stream is now closed | ||||
|                         try: | ||||
|                             await stream.receive() | ||||
|                         except trio.EndOfChannel: | ||||
|                             pass | ||||
| 
 | ||||
|             # TODO: should be just raise the closed resource err | ||||
|             # directly here to enforce not allowing a re-open | ||||
|             # of a stream to the context (at least until a time of | ||||
|             # if/when we decide that's a good idea?) | ||||
|             try: | ||||
|                 async with ctx.open_stream() as stream: | ||||
|                     pass | ||||
|             except trio.ClosedResourceError: | ||||
|                 pass | ||||
| 
 | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def expect_cancelled( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|     global _state | ||||
|     _state = True | ||||
| 
 | ||||
|     await ctx.started() | ||||
| 
 | ||||
|     try: | ||||
|         async with ctx.open_stream() as stream: | ||||
|             async for msg in stream: | ||||
|                 await stream.send(msg)  # echo server | ||||
| 
 | ||||
|     except trio.Cancelled: | ||||
|         # expected case | ||||
|         _state = False | ||||
|         raise | ||||
| 
 | ||||
|     else: | ||||
|         assert 0, "Wasn't cancelled!?" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'use_ctx_cancel_method', | ||||
|     [False, True], | ||||
| ) | ||||
| @tractor_test | ||||
| async def test_caller_closes_ctx_after_callee_opens_stream( | ||||
|     use_ctx_cancel_method: bool, | ||||
| ): | ||||
|     'caller context closes without using stream' | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'ctx_cancelled', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             expect_cancelled, | ||||
|         ) as (ctx, sent): | ||||
|             await portal.run(assert_state, value=True) | ||||
| 
 | ||||
|             assert sent is None | ||||
| 
 | ||||
|             # call cancel explicitly | ||||
|             if use_ctx_cancel_method: | ||||
| 
 | ||||
|                 await ctx.cancel() | ||||
| 
 | ||||
|                 try: | ||||
|                     async with ctx.open_stream() as stream: | ||||
|                         async for msg in stream: | ||||
|                             pass | ||||
| 
 | ||||
|                 except tractor.ContextCancelled: | ||||
|                     raise  # XXX: must be propagated to __aexit__ | ||||
| 
 | ||||
|                 else: | ||||
|                     assert 0, "Should have context cancelled?" | ||||
| 
 | ||||
|                 # channel should still be up | ||||
|                 assert portal.channel.connected() | ||||
| 
 | ||||
|                 # ctx is closed here | ||||
|                 await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|             else: | ||||
|                 try: | ||||
|                     with trio.fail_after(0.2): | ||||
|                         await ctx.result() | ||||
|                         assert 0, "Callee should have blocked!?" | ||||
|                 except trio.TooSlowError: | ||||
|                     await ctx.cancel() | ||||
|         try: | ||||
|             async with ctx.open_stream() as stream: | ||||
|                 async for msg in stream: | ||||
|                     pass | ||||
|         except tractor.ContextCancelled: | ||||
|             pass | ||||
|         else: | ||||
|             assert 0, "Should have received closed resource error?" | ||||
| 
 | ||||
|         # ctx is closed here | ||||
|         await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|         # channel should not have been destroyed yet, only the | ||||
|         # inter-actor-task context | ||||
|         assert portal.channel.connected() | ||||
| 
 | ||||
|         # teardown the actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_multitask_caller_cancels_from_nonroot_task(): | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'ctx_cancelled', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             expect_cancelled, | ||||
|         ) as (ctx, sent): | ||||
| 
 | ||||
|             await portal.run(assert_state, value=True) | ||||
|             assert sent is None | ||||
| 
 | ||||
|             async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                 async def send_msg_then_cancel(): | ||||
|                     await stream.send('yo') | ||||
|                     await portal.run(assert_state, value=True) | ||||
|                     await ctx.cancel() | ||||
|                     await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|                 async with trio.open_nursery() as n: | ||||
|                     n.start_soon(send_msg_then_cancel) | ||||
| 
 | ||||
|                     try: | ||||
|                         async for msg in stream: | ||||
|                             assert msg == 'yo' | ||||
| 
 | ||||
|                     except tractor.ContextCancelled: | ||||
|                         raise  # XXX: must be propagated to __aexit__ | ||||
| 
 | ||||
|                 # channel should still be up | ||||
|                 assert portal.channel.connected() | ||||
| 
 | ||||
|                 # ctx is closed here | ||||
|                 await portal.run(assert_state, value=False) | ||||
| 
 | ||||
|         # channel should not have been destroyed yet, only the | ||||
|         # inter-actor-task context | ||||
|         assert portal.channel.connected() | ||||
| 
 | ||||
|         # teardown the actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def cancel_self( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|     global _state | ||||
|     _state = True | ||||
| 
 | ||||
|     await ctx.cancel() | ||||
|     try: | ||||
|         with trio.fail_after(0.1): | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
|     except trio.Cancelled: | ||||
|         raise | ||||
| 
 | ||||
|     except trio.TooSlowError: | ||||
|         # should never get here | ||||
|         assert 0 | ||||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_callee_cancels_before_started(): | ||||
|     '''callee calls `Context.cancel()` while streaming and caller | ||||
|     sees stream terminated in `ContextCancelled`. | ||||
| 
 | ||||
|     ''' | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'cancels_self', | ||||
|             enable_modules=[__name__], | ||||
|         ) | ||||
|         try: | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 cancel_self, | ||||
|             ) as (ctx, sent): | ||||
|                 async with ctx.open_stream(): | ||||
| 
 | ||||
|                     await trio.sleep_forever() | ||||
| 
 | ||||
|         # raises a special cancel signal | ||||
|         except tractor.ContextCancelled as ce: | ||||
|             ce.type == trio.Cancelled | ||||
| 
 | ||||
|         # teardown the actor | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_rpc( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
| 
 | ||||
| ) -> None: | ||||
|     """Test a small ping-pong server. | ||||
| 
 | ||||
|     """ | ||||
|     # signal to parent that we're up | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     print('opening stream in callee') | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         count = 0 | ||||
|         while True: | ||||
|             try: | ||||
|                 await stream.receive() == 'ping' | ||||
|             except trio.EndOfChannel: | ||||
|                 assert count == 10 | ||||
|                 break | ||||
|             else: | ||||
|                 print('pong') | ||||
|                 await stream.send('pong') | ||||
|                 count += 1 | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def simple_rpc_with_forloop( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
|     data: int, | ||||
| 
 | ||||
| ) -> None: | ||||
|     """Same as previous test but using ``async for`` syntax/api. | ||||
| 
 | ||||
|     """ | ||||
| 
 | ||||
|     # signal to parent that we're up | ||||
|     await ctx.started(data + 1) | ||||
| 
 | ||||
|     print('opening stream in callee') | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         count = 0 | ||||
|         async for msg in stream: | ||||
| 
 | ||||
|             assert msg == 'ping' | ||||
|             print('pong') | ||||
|             await stream.send('pong') | ||||
|             count += 1 | ||||
| 
 | ||||
|         else: | ||||
|             assert count == 10 | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'use_async_for', | ||||
|     [True, False], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'server_func', | ||||
|     [simple_rpc, simple_rpc_with_forloop], | ||||
| ) | ||||
| def test_simple_rpc(server_func, use_async_for): | ||||
|     """The simplest request response pattern. | ||||
| 
 | ||||
|     """ | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             portal = await n.start_actor( | ||||
|                 'rpc_server', | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
| 
 | ||||
|             async with portal.open_context( | ||||
|                 server_func,  # taken from pytest parameterization | ||||
|                 data=10, | ||||
|             ) as (ctx, sent): | ||||
| 
 | ||||
|                 assert sent == 11 | ||||
| 
 | ||||
|                 async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                     if use_async_for: | ||||
| 
 | ||||
|                         count = 0 | ||||
|                         # receive msgs using async for style | ||||
|                         print('ping') | ||||
|                         await stream.send('ping') | ||||
| 
 | ||||
|                         async for msg in stream: | ||||
|                             assert msg == 'pong' | ||||
|                             print('ping') | ||||
|                             await stream.send('ping') | ||||
|                             count += 1 | ||||
| 
 | ||||
|                             if count >= 9: | ||||
|                                 break | ||||
| 
 | ||||
|                     else: | ||||
|                         # classic send/receive style | ||||
|                         for _ in range(10): | ||||
| 
 | ||||
|                             print('ping') | ||||
|                             await stream.send('ping') | ||||
|                             assert await stream.receive() == 'pong' | ||||
| 
 | ||||
|                 # stream should terminate here | ||||
| 
 | ||||
|             # final context result(s) should be consumed here in __aexit__() | ||||
| 
 | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  | @ -0,0 +1,220 @@ | |||
| """ | ||||
| Advanced streaming patterns using bidirectional streams and contexts. | ||||
| 
 | ||||
| """ | ||||
| import itertools | ||||
| from typing import Set, Dict, List | ||||
| 
 | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| 
 | ||||
| _registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { | ||||
|     'even': set(), | ||||
|     'odd': set(), | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| async def publisher( | ||||
| 
 | ||||
|     seed: int = 0, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     global _registry | ||||
| 
 | ||||
|     def is_even(i): | ||||
|         return i % 2 == 0 | ||||
| 
 | ||||
|     for val in itertools.count(seed): | ||||
| 
 | ||||
|         sub = 'even' if is_even(val) else 'odd' | ||||
| 
 | ||||
|         for sub_stream in _registry[sub].copy(): | ||||
|             await sub_stream.send(val) | ||||
| 
 | ||||
|         # throttle send rate to ~1kHz | ||||
|         # making it readable to a human user | ||||
|         await trio.sleep(1/1000) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def subscribe( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     global _registry | ||||
| 
 | ||||
|     # syn caller | ||||
|     await ctx.started(None) | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         # update subs list as consumer requests | ||||
|         async for new_subs in stream: | ||||
| 
 | ||||
|             new_subs = set(new_subs) | ||||
|             remove = new_subs - _registry.keys() | ||||
| 
 | ||||
|             print(f'setting sub to {new_subs} for {ctx.chan.uid}') | ||||
| 
 | ||||
|             # remove old subs | ||||
|             for sub in remove: | ||||
|                 _registry[sub].remove(stream) | ||||
| 
 | ||||
|             # add new subs for consumer | ||||
|             for sub in new_subs: | ||||
|                 _registry[sub].add(stream) | ||||
| 
 | ||||
| 
 | ||||
| async def consumer( | ||||
| 
 | ||||
|     subs: List[str], | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     uid = tractor.current_actor().uid | ||||
| 
 | ||||
|     async with tractor.wait_for_actor('publisher') as portal: | ||||
|         async with portal.open_context(subscribe) as (ctx, first): | ||||
|             async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                 # flip between the provided subs dynamically | ||||
|                 if len(subs) > 1: | ||||
| 
 | ||||
|                     for sub in itertools.cycle(subs): | ||||
|                         print(f'setting dynamic sub to {sub}') | ||||
|                         await stream.send([sub]) | ||||
| 
 | ||||
|                         count = 0 | ||||
|                         async for value in stream: | ||||
|                             print(f'{uid} got: {value}') | ||||
|                             if count > 5: | ||||
|                                 break | ||||
|                             count += 1 | ||||
| 
 | ||||
|                 else:  # static sub | ||||
| 
 | ||||
|                     await stream.send(subs) | ||||
|                     async for value in stream: | ||||
|                         print(f'{uid} got: {value}') | ||||
| 
 | ||||
| 
 | ||||
| def test_dynamic_pub_sub(): | ||||
| 
 | ||||
|     global _registry | ||||
| 
 | ||||
|     from multiprocessing import cpu_count | ||||
|     cpus = cpu_count() | ||||
| 
 | ||||
|     async def main(): | ||||
|         async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|             # name of this actor will be same as target func | ||||
|             await n.run_in_actor(publisher) | ||||
| 
 | ||||
|             for i, sub in zip( | ||||
|                 range(cpus - 2), | ||||
|                 itertools.cycle(_registry.keys()) | ||||
|             ): | ||||
|                 await n.run_in_actor( | ||||
|                     consumer, | ||||
|                     name=f'consumer_{sub}', | ||||
|                     subs=[sub], | ||||
|                 ) | ||||
| 
 | ||||
|             # make one dynamic subscriber | ||||
|             await n.run_in_actor( | ||||
|                 consumer, | ||||
|                 name='consumer_dynamic', | ||||
|                 subs=list(_registry.keys()), | ||||
|             ) | ||||
| 
 | ||||
|             # block until cancelled by user | ||||
|             with trio.fail_after(3): | ||||
|                 await trio.sleep_forever() | ||||
| 
 | ||||
|     try: | ||||
|         trio.run(main) | ||||
|     except trio.TooSlowError: | ||||
|         pass | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def one_task_streams_and_one_handles_reqresp( | ||||
| 
 | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
| 
 | ||||
|     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|         async def pingpong(): | ||||
|             '''Run a simple req/response service. | ||||
| 
 | ||||
|             ''' | ||||
|             async for msg in stream: | ||||
|                 print('rpc server ping') | ||||
|                 assert msg == 'ping' | ||||
|                 print('rpc server pong') | ||||
|                 await stream.send('pong') | ||||
| 
 | ||||
|         async with trio.open_nursery() as n: | ||||
|             n.start_soon(pingpong) | ||||
| 
 | ||||
|             for _ in itertools.count(): | ||||
|                 await stream.send('yo') | ||||
|                 await trio.sleep(0.01) | ||||
| 
 | ||||
| 
 | ||||
| def test_reqresp_ontopof_streaming(): | ||||
|     '''Test a subactor that both streams with one task and | ||||
|     spawns another which handles a small requests-response | ||||
|     dialogue over the same bidir-stream. | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
| 
 | ||||
|         with trio.move_on_after(2): | ||||
|             async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|                 # name of this actor will be same as target func | ||||
|                 portal = await n.start_actor( | ||||
|                     'dual_tasks', | ||||
|                     enable_modules=[__name__] | ||||
|                 ) | ||||
| 
 | ||||
|                 # flat to make sure we get at least one pong | ||||
|                 got_pong: bool = False | ||||
| 
 | ||||
|                 async with portal.open_context( | ||||
|                     one_task_streams_and_one_handles_reqresp, | ||||
| 
 | ||||
|                 ) as (ctx, first): | ||||
| 
 | ||||
|                     assert first is None | ||||
| 
 | ||||
|                     async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                         await stream.send('ping') | ||||
| 
 | ||||
|                         async for msg in stream: | ||||
|                             print(f'client received: {msg}') | ||||
| 
 | ||||
|                             assert msg in {'pong', 'yo'} | ||||
| 
 | ||||
|                             if msg == 'pong': | ||||
|                                 got_pong = True | ||||
|                                 await stream.send('ping') | ||||
|                                 print('client sent ping') | ||||
| 
 | ||||
|         assert got_pong | ||||
| 
 | ||||
|     try: | ||||
|         trio.run(main) | ||||
|     except trio.TooSlowError: | ||||
|         pass | ||||
|  | @ -8,6 +8,7 @@ TODO: None of these tests have been run successfully on windows yet. | |||
| """ | ||||
| import time | ||||
| from os import path | ||||
| import platform | ||||
| 
 | ||||
| import pytest | ||||
| import pexpect | ||||
|  | @ -25,6 +26,13 @@ from conftest import repodir | |||
| # - recurrent root errors | ||||
| 
 | ||||
| 
 | ||||
| if platform.system() == 'Windows': | ||||
|     pytest.skip( | ||||
|         'Debugger tests have no windows support (yet)', | ||||
|         allow_module_level=True, | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def examples_dir(): | ||||
|     """Return the abspath to the examples directory. | ||||
|     """ | ||||
|  |  | |||
|  | @ -84,8 +84,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): | |||
| 
 | ||||
|         if '__' not in f | ||||
|         and f[0] != '_' | ||||
|         and 'debugging' not in p[0] | ||||
|     ], | ||||
|         and 'debugging' not in p[0]], | ||||
| 
 | ||||
|     ids=lambda t: t[1], | ||||
| ) | ||||
| def test_example(run_example_in_subproc, example_script): | ||||
|  | @ -98,6 +98,10 @@ def test_example(run_example_in_subproc, example_script): | |||
|     test_example``. | ||||
|     """ | ||||
|     ex_file = os.path.join(*example_script) | ||||
| 
 | ||||
|     if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): | ||||
|         pytest.skip("2-way streaming example requires py3.9 async with syntax") | ||||
| 
 | ||||
|     with open(ex_file, 'r') as ex: | ||||
|         code = ex.read() | ||||
| 
 | ||||
|  |  | |||
|  | @ -32,13 +32,16 @@ async def async_gen_stream(sequence): | |||
| 
 | ||||
|     # block indefinitely waiting to be cancelled by ``aclose()`` call | ||||
|     with trio.CancelScope() as cs: | ||||
|         await trio.sleep(float('inf')) | ||||
|         await trio.sleep_forever() | ||||
|         assert 0 | ||||
|     assert cs.cancelled_caught | ||||
| 
 | ||||
| 
 | ||||
| @tractor.stream | ||||
| async def context_stream(ctx, sequence): | ||||
| async def context_stream( | ||||
|     ctx: tractor.Context, | ||||
|     sequence | ||||
| ): | ||||
|     for i in sequence: | ||||
|         await ctx.send_yield(i) | ||||
|         await trio.sleep(0.1) | ||||
|  | @ -338,6 +341,8 @@ async def test_respawn_consumer_task( | |||
|                         print("all values streamed, BREAKING") | ||||
|                         break | ||||
| 
 | ||||
|                 cs.cancel() | ||||
| 
 | ||||
|         # TODO: this is justification for a | ||||
|         # ``ActorNursery.stream_from_actor()`` helper? | ||||
|         await portal.cancel_actor() | ||||
|  |  | |||
|  | @ -5,11 +5,21 @@ tractor: An actor model micro-framework built on | |||
| from trio import MultiError | ||||
| 
 | ||||
| from ._ipc import Channel | ||||
| from ._streaming import Context, stream | ||||
| from ._streaming import ( | ||||
|     Context, | ||||
|     ReceiveMsgStream, | ||||
|     MsgStream, | ||||
|     stream, | ||||
|     context, | ||||
| ) | ||||
| from ._discovery import get_arbiter, find_actor, wait_for_actor | ||||
| from ._trionics import open_nursery | ||||
| from ._state import current_actor, is_root_process | ||||
| from ._exceptions import RemoteActorError, ModuleNotExposed | ||||
| from ._exceptions import ( | ||||
|     RemoteActorError, | ||||
|     ModuleNotExposed, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from ._debug import breakpoint, post_mortem | ||||
| from . import msg | ||||
| from ._root import run, run_daemon, open_root_actor | ||||
|  | @ -21,6 +31,7 @@ __all__ = [ | |||
|     'ModuleNotExposed', | ||||
|     'MultiError', | ||||
|     'RemoteActorError', | ||||
|     'ContextCancelled', | ||||
|     'breakpoint', | ||||
|     'current_actor', | ||||
|     'find_actor', | ||||
|  | @ -33,7 +44,9 @@ __all__ = [ | |||
|     'run', | ||||
|     'run_daemon', | ||||
|     'stream', | ||||
|     'wait_for_actor', | ||||
|     'context', | ||||
|     'ReceiveMsgStream', | ||||
|     'MsgStream', | ||||
|     'to_asyncio', | ||||
|     'wait_for_actor', | ||||
| ] | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ from types import ModuleType | |||
| import sys | ||||
| import os | ||||
| from contextlib import ExitStack | ||||
| import warnings | ||||
| 
 | ||||
| import trio  # type: ignore | ||||
| from trio_typing import TaskStatus | ||||
|  | @ -27,6 +28,7 @@ from ._exceptions import ( | |||
|     unpack_error, | ||||
|     ModuleNotExposed, | ||||
|     is_multi_cancelled, | ||||
|     ContextCancelled, | ||||
|     TransportClosed, | ||||
| ) | ||||
| from . import _debug | ||||
|  | @ -44,6 +46,7 @@ class ActorFailure(Exception): | |||
| 
 | ||||
| 
 | ||||
| async def _invoke( | ||||
| 
 | ||||
|     actor: 'Actor', | ||||
|     cid: str, | ||||
|     chan: Channel, | ||||
|  | @ -56,15 +59,44 @@ async def _invoke( | |||
|     """Invoke local func and deliver result(s) over provided channel. | ||||
|     """ | ||||
|     treat_as_gen = False | ||||
|     cs = None | ||||
| 
 | ||||
|     # possible a traceback (not sure what typing is for this..) | ||||
|     tb = None | ||||
| 
 | ||||
|     cancel_scope = trio.CancelScope() | ||||
|     ctx = Context(chan, cid, cancel_scope) | ||||
|     cs: Optional[trio.CancelScope] = None | ||||
| 
 | ||||
|     ctx = Context(chan, cid) | ||||
|     context: bool = False | ||||
| 
 | ||||
|     if getattr(func, '_tractor_stream_function', False): | ||||
|         # handle decorated ``@tractor.stream`` async functions | ||||
|         sig = inspect.signature(func) | ||||
|         params = sig.parameters | ||||
| 
 | ||||
|         # compat with old api | ||||
|         kwargs['ctx'] = ctx | ||||
| 
 | ||||
|         if 'ctx' in params: | ||||
|             warnings.warn( | ||||
|                 "`@tractor.stream decorated funcs should now declare " | ||||
|                 "a `stream`  arg, `ctx` is now designated for use with " | ||||
|                 "@tractor.context", | ||||
|                 DeprecationWarning, | ||||
|                 stacklevel=2, | ||||
|             ) | ||||
| 
 | ||||
|         elif 'stream' in params: | ||||
|             assert 'stream' in params | ||||
|             kwargs['stream'] = ctx | ||||
| 
 | ||||
|         treat_as_gen = True | ||||
| 
 | ||||
|     elif getattr(func, '_tractor_context_function', False): | ||||
|         # handle decorated ``@tractor.context`` async function | ||||
|         kwargs['ctx'] = ctx | ||||
|         context = True | ||||
| 
 | ||||
|     # errors raised inside this block are propgated back to caller | ||||
|     try: | ||||
|         if not ( | ||||
|  | @ -102,52 +134,106 @@ async def _invoke( | |||
|             # `StopAsyncIteration` system here for returning a final | ||||
|             # value if desired | ||||
|             await chan.send({'stop': True, 'cid': cid}) | ||||
|         else: | ||||
|             if treat_as_gen: | ||||
|                 await chan.send({'functype': 'asyncgen', 'cid': cid}) | ||||
|                 # XXX: the async-func may spawn further tasks which push | ||||
|                 # back values like an async-generator would but must | ||||
|                 # manualy construct the response dict-packet-responses as | ||||
|                 # above | ||||
|                 with cancel_scope as cs: | ||||
|                     task_status.started(cs) | ||||
|                     await coro | ||||
|                 if not cs.cancelled_caught: | ||||
|                     # task was not cancelled so we can instruct the | ||||
|                     # far end async gen to tear down | ||||
|                     await chan.send({'stop': True, 'cid': cid}) | ||||
|             else: | ||||
|                 # regular async function | ||||
|                 await chan.send({'functype': 'asyncfunc', 'cid': cid}) | ||||
|                 with cancel_scope as cs: | ||||
|                     task_status.started(cs) | ||||
| 
 | ||||
|         # one way @stream func that gets treated like an async gen | ||||
|         elif treat_as_gen: | ||||
|             await chan.send({'functype': 'asyncgen', 'cid': cid}) | ||||
|             # XXX: the async-func may spawn further tasks which push | ||||
|             # back values like an async-generator would but must | ||||
|             # manualy construct the response dict-packet-responses as | ||||
|             # above | ||||
|             with cancel_scope as cs: | ||||
|                 task_status.started(cs) | ||||
|                 await coro | ||||
| 
 | ||||
|             if not cs.cancelled_caught: | ||||
|                 # task was not cancelled so we can instruct the | ||||
|                 # far end async gen to tear down | ||||
|                 await chan.send({'stop': True, 'cid': cid}) | ||||
| 
 | ||||
|         elif context: | ||||
|             # context func with support for bi-dir streaming | ||||
|             await chan.send({'functype': 'context', 'cid': cid}) | ||||
| 
 | ||||
|             async with trio.open_nursery() as scope_nursery: | ||||
|                 ctx._scope_nursery = scope_nursery | ||||
|                 cs = scope_nursery.cancel_scope | ||||
|                 task_status.started(cs) | ||||
|                 try: | ||||
|                     await chan.send({'return': await coro, 'cid': cid}) | ||||
|                 except trio.Cancelled as err: | ||||
|                     tb = err.__traceback__ | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
| 
 | ||||
|                 # TODO: pack in ``trio.Cancelled.__traceback__`` here | ||||
|                 # so they can be unwrapped and displayed on the caller | ||||
|                 # side! | ||||
| 
 | ||||
|                 fname = func.__name__ | ||||
|                 if ctx._cancel_called: | ||||
|                     msg = f'{fname} cancelled itself' | ||||
| 
 | ||||
|                 elif cs.cancel_called: | ||||
|                     msg = ( | ||||
|                         f'{fname} was remotely cancelled by its caller ' | ||||
|                         f'{ctx.chan.uid}' | ||||
|                     ) | ||||
| 
 | ||||
|                 # task-contex was cancelled so relay to the cancel to caller | ||||
|                 raise ContextCancelled( | ||||
|                     msg, | ||||
|                     suberror_type=trio.Cancelled, | ||||
|                 ) | ||||
| 
 | ||||
|         else: | ||||
|             # regular async function | ||||
|             await chan.send({'functype': 'asyncfunc', 'cid': cid}) | ||||
|             with cancel_scope as cs: | ||||
|                 task_status.started(cs) | ||||
|                 await chan.send({'return': await coro, 'cid': cid}) | ||||
| 
 | ||||
|     except (Exception, trio.MultiError) as err: | ||||
| 
 | ||||
|         # TODO: maybe we'll want differnet "levels" of debugging | ||||
|         # eventualy such as ('app', 'supervisory', 'runtime') ? | ||||
|         if not isinstance(err, trio.ClosedResourceError) and ( | ||||
|             not is_multi_cancelled(err) | ||||
|         ): | ||||
|             # XXX: is there any case where we'll want to debug IPC | ||||
|             # disconnects? I can't think of a reason that inspecting | ||||
|             # this type of failure will be useful for respawns or | ||||
|             # recovery logic - the only case is some kind of strange bug | ||||
|             # in `trio` itself? | ||||
|             entered = await _debug._maybe_enter_pm(err) | ||||
|             if not entered: | ||||
|         if not is_multi_cancelled(err): | ||||
| 
 | ||||
|             log.exception("Actor crashed:") | ||||
| 
 | ||||
|             # TODO: maybe we'll want different "levels" of debugging | ||||
|             # eventualy such as ('app', 'supervisory', 'runtime') ? | ||||
| 
 | ||||
|             # if not isinstance(err, trio.ClosedResourceError) and ( | ||||
|             # if not is_multi_cancelled(err) and ( | ||||
| 
 | ||||
|             entered_debug: bool = False | ||||
|             if not isinstance(err, ContextCancelled) or ( | ||||
|                 isinstance(err, ContextCancelled) and ctx._cancel_called | ||||
|             ): | ||||
|                 # XXX: is there any case where we'll want to debug IPC | ||||
|                 # disconnects as a default? | ||||
|                 # | ||||
|                 # I can't think of a reason that inspecting | ||||
|                 # this type of failure will be useful for respawns or | ||||
|                 # recovery logic - the only case is some kind of strange bug | ||||
|                 # in our transport layer itself? Going to keep this | ||||
|                 # open ended for now. | ||||
| 
 | ||||
|                 entered_debug = await _debug._maybe_enter_pm(err) | ||||
| 
 | ||||
|             if not entered_debug: | ||||
|                 log.exception("Actor crashed:") | ||||
| 
 | ||||
|         # always ship errors back to caller | ||||
|         err_msg = pack_error(err) | ||||
|         err_msg = pack_error(err, tb=tb) | ||||
|         err_msg['cid'] = cid | ||||
|         try: | ||||
|             await chan.send(err_msg) | ||||
| 
 | ||||
|         except trio.ClosedResourceError: | ||||
|             log.warning( | ||||
|                 f"Failed to ship error to caller @ {chan.uid}") | ||||
|             # if we can't propagate the error that's a big boo boo | ||||
|             log.error( | ||||
|                 f"Failed to ship error to caller @ {chan.uid} !?" | ||||
|             ) | ||||
| 
 | ||||
|         if cs is None: | ||||
|             # error is from above code not from rpc invocation | ||||
|  | @ -165,7 +251,7 @@ async def _invoke( | |||
|                 f"Task {func} likely errored or cancelled before it started") | ||||
|         finally: | ||||
|             if not actor._rpc_tasks: | ||||
|                 log.info("All RPC tasks have completed") | ||||
|                 log.runtime("All RPC tasks have completed") | ||||
|                 actor._ongoing_rpc_tasks.set() | ||||
| 
 | ||||
| 
 | ||||
|  | @ -180,10 +266,10 @@ _lifetime_stack: ExitStack = ExitStack() | |||
| class Actor: | ||||
|     """The fundamental concurrency primitive. | ||||
| 
 | ||||
|     An *actor* is the combination of a regular Python or | ||||
|     ``multiprocessing.Process`` executing a ``trio`` task tree, communicating | ||||
|     An *actor* is the combination of a regular Python process | ||||
|     executing a ``trio`` task tree, communicating | ||||
|     with other actors through "portals" which provide a native async API | ||||
|     around "channels". | ||||
|     around various IPC transport "channels". | ||||
|     """ | ||||
|     is_arbiter: bool = False | ||||
| 
 | ||||
|  | @ -327,14 +413,18 @@ class Actor: | |||
|             raise mne | ||||
| 
 | ||||
|     async def _stream_handler( | ||||
| 
 | ||||
|         self, | ||||
|         stream: trio.SocketStream, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         """Entry point for new inbound connections to the channel server. | ||||
| 
 | ||||
|         """ | ||||
|         self._no_more_peers = trio.Event()  # unset | ||||
| 
 | ||||
|         chan = Channel(stream=stream) | ||||
|         log.info(f"New connection to us {chan}") | ||||
|         log.runtime(f"New connection to us {chan}") | ||||
| 
 | ||||
|         # send/receive initial handshake response | ||||
|         try: | ||||
|  | @ -365,11 +455,16 @@ class Actor: | |||
|             event.set() | ||||
| 
 | ||||
|         chans = self._peers[uid] | ||||
| 
 | ||||
|         # TODO: re-use channels for new connections instead | ||||
|         # of always new ones; will require changing all the | ||||
|         # discovery funcs | ||||
|         if chans: | ||||
|             log.warning( | ||||
|             log.runtime( | ||||
|                 f"already have channel(s) for {uid}:{chans}?" | ||||
|             ) | ||||
|         log.trace(f"Registered {chan} for {uid}")  # type: ignore | ||||
| 
 | ||||
|         log.runtime(f"Registered {chan} for {uid}")  # type: ignore | ||||
|         # append new channel | ||||
|         self._peers[uid].append(chan) | ||||
| 
 | ||||
|  | @ -378,10 +473,24 @@ class Actor: | |||
|         try: | ||||
|             await self._process_messages(chan) | ||||
|         finally: | ||||
| 
 | ||||
|             # channel cleanup sequence | ||||
| 
 | ||||
|             # for (channel, cid) in self._rpc_tasks.copy(): | ||||
|             #     if channel is chan: | ||||
|             #         with trio.CancelScope(shield=True): | ||||
|             #             await self._cancel_task(cid, channel) | ||||
| 
 | ||||
|             #             # close all consumer side task mem chans | ||||
|             #             send_chan, _ = self._cids2qs[(chan.uid, cid)] | ||||
|             #             assert send_chan.cid == cid  # type: ignore | ||||
|             #             await send_chan.aclose() | ||||
| 
 | ||||
|             # Drop ref to channel so it can be gc-ed and disconnected | ||||
|             log.debug(f"Releasing channel {chan} from {chan.uid}") | ||||
|             chans = self._peers.get(chan.uid) | ||||
|             chans.remove(chan) | ||||
| 
 | ||||
|             if not chans: | ||||
|                 log.debug(f"No more channels for {chan.uid}") | ||||
|                 self._peers.pop(chan.uid, None) | ||||
|  | @ -394,14 +503,22 @@ class Actor: | |||
| 
 | ||||
|             # # XXX: is this necessary (GC should do it?) | ||||
|             if chan.connected(): | ||||
|                 # if the channel is still connected it may mean the far | ||||
|                 # end has not closed and we may have gotten here due to | ||||
|                 # an error and so we should at least try to terminate | ||||
|                 # the channel from this end gracefully. | ||||
| 
 | ||||
|                 log.debug(f"Disconnecting channel {chan}") | ||||
|                 try: | ||||
|                     # send our msg loop terminate sentinel | ||||
|                     # send a msg loop terminate sentinel | ||||
|                     await chan.send(None) | ||||
| 
 | ||||
|                     # XXX: do we want this? | ||||
|                     # causes "[104] connection reset by peer" on other end | ||||
|                     # await chan.aclose() | ||||
| 
 | ||||
|                 except trio.BrokenResourceError: | ||||
|                     log.exception( | ||||
|                         f"Channel for {chan.uid} was already zonked..") | ||||
|                     log.warning(f"Channel for {chan.uid} was already closed") | ||||
| 
 | ||||
|     async def _push_result( | ||||
|         self, | ||||
|  | @ -411,22 +528,32 @@ class Actor: | |||
|     ) -> None: | ||||
|         """Push an RPC result to the local consumer's queue. | ||||
|         """ | ||||
|         actorid = chan.uid | ||||
|         assert actorid, f"`actorid` can't be {actorid}" | ||||
|         send_chan, recv_chan = self._cids2qs[(actorid, cid)] | ||||
|         # actorid = chan.uid | ||||
|         assert chan.uid, f"`chan.uid` can't be {chan.uid}" | ||||
|         send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] | ||||
|         assert send_chan.cid == cid  # type: ignore | ||||
| 
 | ||||
|         if 'stop' in msg: | ||||
|             log.debug(f"{send_chan} was terminated at remote end") | ||||
|             # indicate to consumer that far end has stopped | ||||
|             return await send_chan.aclose() | ||||
|         # if 'error' in msg: | ||||
|         #     ctx = getattr(recv_chan, '_ctx', None) | ||||
|         #     if ctx: | ||||
|         #         ctx._error_from_remote_msg(msg) | ||||
| 
 | ||||
|         #     log.debug(f"{send_chan} was terminated at remote end") | ||||
|         #     # indicate to consumer that far end has stopped | ||||
|         #     return await send_chan.aclose() | ||||
| 
 | ||||
|         try: | ||||
|             log.debug(f"Delivering {msg} from {actorid} to caller {cid}") | ||||
|             log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") | ||||
|             # maintain backpressure | ||||
|             await send_chan.send(msg) | ||||
| 
 | ||||
|         except trio.BrokenResourceError: | ||||
|             # TODO: what is the right way to handle the case where the | ||||
|             # local task has already sent a 'stop' / StopAsyncInteration | ||||
|             # to the other side but and possibly has closed the local | ||||
|             # feeder mem chan? Do we wait for some kind of ack or just | ||||
|             # let this fail silently and bubble up (currently)? | ||||
| 
 | ||||
|             # XXX: local consumer has closed their side | ||||
|             # so cancel the far end streaming task | ||||
|             log.warning(f"{send_chan} consumer is already closed") | ||||
|  | @ -435,7 +562,9 @@ class Actor: | |||
|         self, | ||||
|         actorid: Tuple[str, str], | ||||
|         cid: str | ||||
| 
 | ||||
|     ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: | ||||
| 
 | ||||
|         log.debug(f"Getting result queue for {actorid} cid {cid}") | ||||
|         try: | ||||
|             send_chan, recv_chan = self._cids2qs[(actorid, cid)] | ||||
|  | @ -489,23 +618,28 @@ class Actor: | |||
|                 task_status.started(loop_cs) | ||||
|                 async for msg in chan: | ||||
|                     if msg is None:  # loop terminate sentinel | ||||
| 
 | ||||
|                         log.debug( | ||||
|                             f"Cancelling all tasks for {chan} from {chan.uid}") | ||||
|                         for (channel, cid) in self._rpc_tasks: | ||||
| 
 | ||||
|                         for (channel, cid) in self._rpc_tasks.copy(): | ||||
|                             if channel is chan: | ||||
|                                 await self._cancel_task(cid, channel) | ||||
| 
 | ||||
|                         log.debug( | ||||
|                                 f"Msg loop signalled to terminate for" | ||||
|                                 f" {chan} from {chan.uid}") | ||||
| 
 | ||||
|                         break | ||||
| 
 | ||||
|                     log.trace(   # type: ignore | ||||
|                     log.transport(   # type: ignore | ||||
|                         f"Received msg {msg} from {chan.uid}") | ||||
| 
 | ||||
|                     cid = msg.get('cid') | ||||
|                     if cid: | ||||
|                         # deliver response to local caller/waiter | ||||
|                         await self._push_result(chan, cid, msg) | ||||
| 
 | ||||
|                         log.debug( | ||||
|                             f"Waiting on next msg for {chan} from {chan.uid}") | ||||
|                         continue | ||||
|  | @ -566,7 +700,7 @@ class Actor: | |||
|                         else: | ||||
|                             # mark that we have ongoing rpc tasks | ||||
|                             self._ongoing_rpc_tasks = trio.Event() | ||||
|                             log.info(f"RPC func is {func}") | ||||
|                             log.runtime(f"RPC func is {func}") | ||||
|                             # store cancel scope such that the rpc task can be | ||||
|                             # cancelled gracefully if requested | ||||
|                             self._rpc_tasks[(chan, cid)] = ( | ||||
|  | @ -575,7 +709,7 @@ class Actor: | |||
|                         # self.cancel() was called so kill this msg loop | ||||
|                         # and break out into ``_async_main()`` | ||||
|                         log.warning( | ||||
|                             f"{self.uid} was remotely cancelled; " | ||||
|                             f"Actor {self.uid} was remotely cancelled; " | ||||
|                             "waiting on cancellation completion..") | ||||
|                         await self._cancel_complete.wait() | ||||
|                         loop_cs.cancel() | ||||
|  | @ -1043,7 +1177,7 @@ class Actor: | |||
|             raise ValueError(f"{uid} is not a valid uid?!") | ||||
| 
 | ||||
|         chan.uid = uid | ||||
|         log.info(f"Handshake with actor {uid}@{chan.raddr} complete") | ||||
|         log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") | ||||
|         return uid | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -102,7 +102,7 @@ class PdbwTeardown(pdbpp.Pdb): | |||
| 
 | ||||
| #     async with aclosing(async_stdin): | ||||
| #         async for msg in async_stdin: | ||||
| #             log.trace(f"Stdin input:\n{msg}") | ||||
| #             log.runtime(f"Stdin input:\n{msg}") | ||||
| #             # encode to bytes | ||||
| #             bmsg = str.encode(msg) | ||||
| 
 | ||||
|  | @ -276,7 +276,7 @@ def _set_trace(actor=None): | |||
|     pdb = _mk_pdb() | ||||
| 
 | ||||
|     if actor is not None: | ||||
|         log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||
|         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||
| 
 | ||||
|         pdb.set_trace( | ||||
|             # start 2 levels up in user code | ||||
|  | @ -306,7 +306,7 @@ breakpoint = partial( | |||
| 
 | ||||
| 
 | ||||
| def _post_mortem(actor): | ||||
|     log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") | ||||
|     log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") | ||||
|     pdb = _mk_pdb() | ||||
| 
 | ||||
|     # custom Pdb post-mortem entry | ||||
|  |  | |||
|  | @ -16,12 +16,14 @@ from ._state import current_actor, _runtime_vars | |||
| 
 | ||||
| @asynccontextmanager | ||||
| async def get_arbiter( | ||||
| 
 | ||||
|     host: str, | ||||
|     port: int, | ||||
| 
 | ||||
| ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: | ||||
|     """Return a portal instance connected to a local or remote | ||||
|     '''Return a portal instance connected to a local or remote | ||||
|     arbiter. | ||||
|     """ | ||||
|     ''' | ||||
|     actor = current_actor() | ||||
| 
 | ||||
|     if not actor: | ||||
|  | @ -33,16 +35,20 @@ async def get_arbiter( | |||
|         yield LocalPortal(actor, Channel((host, port))) | ||||
|     else: | ||||
|         async with _connect_chan(host, port) as chan: | ||||
| 
 | ||||
|             async with open_portal(chan) as arb_portal: | ||||
| 
 | ||||
|                 yield arb_portal | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def get_root( | ||||
| **kwargs, | ||||
|     **kwargs, | ||||
| ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: | ||||
| 
 | ||||
|     host, port = _runtime_vars['_root_mailbox'] | ||||
|     assert host is not None | ||||
| 
 | ||||
|     async with _connect_chan(host, port) as chan: | ||||
|         async with open_portal(chan, **kwargs) as portal: | ||||
|             yield portal | ||||
|  | @ -60,12 +66,16 @@ async def find_actor( | |||
|     """ | ||||
|     actor = current_actor() | ||||
|     async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: | ||||
| 
 | ||||
|         sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) | ||||
| 
 | ||||
|         # TODO: return portals to all available actors - for now just | ||||
|         # the last one that registered | ||||
|         if name == 'arbiter' and actor.is_arbiter: | ||||
|             raise RuntimeError("The current actor is the arbiter") | ||||
| 
 | ||||
|         elif sockaddr: | ||||
| 
 | ||||
|             async with _connect_chan(*sockaddr) as chan: | ||||
|                 async with open_portal(chan) as portal: | ||||
|                     yield portal | ||||
|  | @ -83,9 +93,12 @@ async def wait_for_actor( | |||
|     A portal to the first registered actor is returned. | ||||
|     """ | ||||
|     actor = current_actor() | ||||
| 
 | ||||
|     async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: | ||||
| 
 | ||||
|         sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) | ||||
|         sockaddr = sockaddrs[-1] | ||||
| 
 | ||||
|         async with _connect_chan(*sockaddr) as chan: | ||||
|             async with open_portal(chan) as portal: | ||||
|                 yield portal | ||||
|  |  | |||
|  | @ -1,7 +1,7 @@ | |||
| """ | ||||
| Our classy exception set. | ||||
| """ | ||||
| from typing import Dict, Any | ||||
| from typing import Dict, Any, Optional, Type | ||||
| import importlib | ||||
| import builtins | ||||
| import traceback | ||||
|  | @ -15,17 +15,16 @@ _this_mod = importlib.import_module(__name__) | |||
| class RemoteActorError(Exception): | ||||
|     # TODO: local recontruction of remote exception deats | ||||
|     "Remote actor exception bundled locally" | ||||
|     def __init__(self, message, type_str, **msgdata) -> None: | ||||
|         super().__init__(message) | ||||
|         for ns in [builtins, _this_mod, trio]: | ||||
|             try: | ||||
|                 self.type = getattr(ns, type_str) | ||||
|                 break | ||||
|             except AttributeError: | ||||
|                 continue | ||||
|         else: | ||||
|             self.type = Exception | ||||
|     def __init__( | ||||
|         self, | ||||
|         message: str, | ||||
|         suberror_type: Optional[Type[BaseException]] = None, | ||||
|         **msgdata | ||||
| 
 | ||||
|     ) -> None: | ||||
|         super().__init__(message) | ||||
| 
 | ||||
|         self.type = suberror_type | ||||
|         self.msgdata = msgdata | ||||
| 
 | ||||
|     # TODO: a trio.MultiError.catch like context manager | ||||
|  | @ -41,6 +40,9 @@ class InternalActorError(RemoteActorError): | |||
| class TransportClosed(trio.ClosedResourceError): | ||||
|     "Underlying channel transport was closed prior to use" | ||||
| 
 | ||||
| class ContextCancelled(RemoteActorError): | ||||
|     "Inter-actor task context cancelled itself on the callee side." | ||||
| 
 | ||||
| 
 | ||||
| class NoResult(RuntimeError): | ||||
|     "No final result is expected for this actor" | ||||
|  | @ -54,13 +56,22 @@ class NoRuntime(RuntimeError): | |||
|     "The root actor has not been initialized yet" | ||||
| 
 | ||||
| 
 | ||||
| def pack_error(exc: BaseException) -> Dict[str, Any]: | ||||
| def pack_error( | ||||
|     exc: BaseException, | ||||
|     tb = None, | ||||
| 
 | ||||
| ) -> Dict[str, Any]: | ||||
|     """Create an "error message" for tranmission over | ||||
|     a channel (aka the wire). | ||||
|     """ | ||||
|     if tb: | ||||
|         tb_str = ''.join(traceback.format_tb(tb)) | ||||
|     else: | ||||
|         tb_str = traceback.format_exc() | ||||
| 
 | ||||
|     return { | ||||
|         'error': { | ||||
|             'tb_str': traceback.format_exc(), | ||||
|             'tb_str': tb_str, | ||||
|             'type_str': type(exc).__name__, | ||||
|         } | ||||
|     } | ||||
|  | @ -77,12 +88,35 @@ def unpack_error( | |||
|     into a local ``RemoteActorError``. | ||||
| 
 | ||||
|     """ | ||||
|     tb_str = msg['error'].get('tb_str', '') | ||||
|     return err_type( | ||||
|         f"{chan.uid}\n" + tb_str, | ||||
|     error = msg['error'] | ||||
| 
 | ||||
|     tb_str = error.get('tb_str', '') | ||||
|     message = f"{chan.uid}\n" + tb_str | ||||
|     type_name = error['type_str'] | ||||
|     suberror_type: Type[BaseException] = Exception | ||||
| 
 | ||||
|     if type_name == 'ContextCancelled': | ||||
|         err_type = ContextCancelled | ||||
|         suberror_type = trio.Cancelled | ||||
| 
 | ||||
|     else:  # try to lookup a suitable local error type | ||||
|         for ns in [builtins, _this_mod, trio]: | ||||
|             try: | ||||
|                 suberror_type = getattr(ns, type_name) | ||||
|                 break | ||||
|             except AttributeError: | ||||
|                 continue | ||||
| 
 | ||||
|     exc = err_type( | ||||
|         message, | ||||
|         suberror_type=suberror_type, | ||||
| 
 | ||||
|         # unpack other fields into error type init | ||||
|         **msg['error'], | ||||
|     ) | ||||
| 
 | ||||
|     return exc | ||||
| 
 | ||||
| 
 | ||||
| def is_multi_cancelled(exc: BaseException) -> bool: | ||||
|     """Predicate to determine if a ``trio.MultiError`` contains only | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| """ | ||||
| Inter-process comms abstractions | ||||
| 
 | ||||
| """ | ||||
| import platform | ||||
| import typing | ||||
|  | @ -61,7 +62,6 @@ class MsgpackTCPStream: | |||
|             use_list=False, | ||||
|         ) | ||||
|         while True: | ||||
| 
 | ||||
|             try: | ||||
|                 data = await self.stream.receive_some(2**10) | ||||
| 
 | ||||
|  | @ -88,7 +88,7 @@ class MsgpackTCPStream: | |||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|             log.trace(f"received {data}")  # type: ignore | ||||
|             log.transport(f"received {data}")  # type: ignore | ||||
| 
 | ||||
|             if data == b'': | ||||
|                 raise TransportClosed( | ||||
|  | @ -169,6 +169,7 @@ class Channel: | |||
|         return self.msgstream.raddr if self.msgstream else None | ||||
| 
 | ||||
|     async def connect( | ||||
| 
 | ||||
|         self, | ||||
|         destaddr: Tuple[Any, ...] = None, | ||||
|         **kwargs | ||||
|  | @ -180,13 +181,21 @@ class Channel: | |||
| 
 | ||||
|         destaddr = destaddr or self._destaddr | ||||
|         assert isinstance(destaddr, tuple) | ||||
|         stream = await trio.open_tcp_stream(*destaddr, **kwargs) | ||||
| 
 | ||||
|         stream = await trio.open_tcp_stream( | ||||
|             *destaddr, | ||||
|             **kwargs | ||||
|         ) | ||||
|         self.msgstream = MsgpackTCPStream(stream) | ||||
| 
 | ||||
|         log.transport( | ||||
|             f'Opened channel to peer {self.laddr} -> {self.raddr}' | ||||
|         ) | ||||
|         return stream | ||||
| 
 | ||||
|     async def send(self, item: Any) -> None: | ||||
| 
 | ||||
|         log.trace(f"send `{item}`")  # type: ignore | ||||
|         log.transport(f"send `{item}`")  # type: ignore | ||||
|         assert self.msgstream | ||||
| 
 | ||||
|         await self.msgstream.send(item) | ||||
|  | @ -205,7 +214,8 @@ class Channel: | |||
|             raise | ||||
| 
 | ||||
|     async def aclose(self) -> None: | ||||
|         log.debug( | ||||
| 
 | ||||
|         log.transport( | ||||
|             f'Closing channel to {self.uid} ' | ||||
|             f'{self.laddr} -> {self.raddr}' | ||||
|         ) | ||||
|  | @ -234,11 +244,11 @@ class Channel: | |||
|                     await self.connect() | ||||
|                 cancelled = cancel_scope.cancelled_caught | ||||
|                 if cancelled: | ||||
|                     log.warning( | ||||
|                     log.transport( | ||||
|                         "Reconnect timed out after 3 seconds, retrying...") | ||||
|                     continue | ||||
|                 else: | ||||
|                     log.warning("Stream connection re-established!") | ||||
|                     log.transport("Stream connection re-established!") | ||||
|                     # run any reconnection sequence | ||||
|                     on_recon = self._recon_seq | ||||
|                     if on_recon: | ||||
|  | @ -247,7 +257,7 @@ class Channel: | |||
|             except (OSError, ConnectionRefusedError): | ||||
|                 if not down: | ||||
|                     down = True | ||||
|                     log.warning( | ||||
|                     log.transport( | ||||
|                         f"Connection to {self.raddr} went down, waiting" | ||||
|                         " for re-establishment") | ||||
|                 await trio.sleep(1) | ||||
|  |  | |||
|  | @ -17,7 +17,12 @@ from async_generator import asynccontextmanager | |||
| from ._state import current_actor | ||||
| from ._ipc import Channel | ||||
| from .log import get_logger | ||||
| from ._exceptions import unpack_error, NoResult, RemoteActorError | ||||
| from ._exceptions import ( | ||||
|     unpack_error, | ||||
|     NoResult, | ||||
|     RemoteActorError, | ||||
|     ContextCancelled, | ||||
| ) | ||||
| from ._streaming import Context, ReceiveMsgStream | ||||
| 
 | ||||
| 
 | ||||
|  | @ -84,7 +89,7 @@ class Portal: | |||
|         ns: str, | ||||
|         func: str, | ||||
|         kwargs, | ||||
|     ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: | ||||
|     ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: | ||||
|         """Submit a function to be scheduled and run by actor, return the | ||||
|         associated caller id, response queue, response type str, | ||||
|         first message packet as a tuple. | ||||
|  | @ -172,6 +177,7 @@ class Portal: | |||
|                 f"Cancelling all streams with {self.channel.uid}") | ||||
|             for stream in self._streams.copy(): | ||||
|                 try: | ||||
|                     # with trio.CancelScope(shield=True): | ||||
|                     await stream.aclose() | ||||
|                 except trio.ClosedResourceError: | ||||
|                     # don't error the stream having already been closed | ||||
|  | @ -289,6 +295,7 @@ class Portal: | |||
|         self, | ||||
|         async_gen_func: Callable,  # typing: ignore | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[ReceiveMsgStream, None]: | ||||
| 
 | ||||
|         if not inspect.isasyncgenfunction(async_gen_func): | ||||
|  | @ -312,13 +319,23 @@ class Portal: | |||
| 
 | ||||
|         ctx = Context(self.channel, cid, _portal=self) | ||||
|         try: | ||||
|             async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: | ||||
|             # deliver receive only stream | ||||
|             async with ReceiveMsgStream(ctx, recv_chan) as rchan: | ||||
|                 self._streams.add(rchan) | ||||
|                 yield rchan | ||||
| 
 | ||||
|         finally: | ||||
| 
 | ||||
|             # cancel the far end task on consumer close | ||||
|             # NOTE: this is a special case since we assume that if using | ||||
|             # this ``.open_fream_from()`` api, the stream is one a one | ||||
|             # time use and we couple the far end tasks's lifetime to | ||||
|             # the consumer's scope; we don't ever send a `'stop'` | ||||
|             # message right now since there shouldn't be a reason to | ||||
|             # stop and restart the stream, right? | ||||
|             try: | ||||
|                 await ctx.cancel() | ||||
| 
 | ||||
|             except trio.ClosedResourceError: | ||||
|                 # if the far end terminates before we send a cancel the | ||||
|                 # underlying transport-channel may already be closed. | ||||
|  | @ -326,16 +343,123 @@ class Portal: | |||
| 
 | ||||
|             self._streams.remove(rchan) | ||||
| 
 | ||||
|     # @asynccontextmanager | ||||
|     # async def open_context( | ||||
|     #     self, | ||||
|     #     func: Callable, | ||||
|     #     **kwargs, | ||||
|     # ) -> Context: | ||||
|     #     # TODO | ||||
|     #     elif resptype == 'context':  # context manager style setup/teardown | ||||
|     #         # TODO likely not here though | ||||
|     #         raise NotImplementedError | ||||
|     @asynccontextmanager | ||||
|     async def open_context( | ||||
| 
 | ||||
|         self, | ||||
|         func: Callable, | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[Tuple[Context, Any], None]: | ||||
|         '''Open an inter-actor task context. | ||||
| 
 | ||||
|         This is a synchronous API which allows for deterministic | ||||
|         setup/teardown of a remote task. The yielded ``Context`` further | ||||
|         allows for opening bidirectional streams, explicit cancellation | ||||
|         and synchronized final result collection. See ``tractor.Context``. | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         # conduct target func method structural checks | ||||
|         if not inspect.iscoroutinefunction(func) and ( | ||||
|             getattr(func, '_tractor_contex_function', False) | ||||
|         ): | ||||
|             raise TypeError( | ||||
|                 f'{func} must be an async generator function!') | ||||
| 
 | ||||
|         fn_mod_path, fn_name = func_deats(func) | ||||
| 
 | ||||
|         recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||
| 
 | ||||
|         cid, recv_chan, functype, first_msg = await self._submit( | ||||
|             fn_mod_path, fn_name, kwargs) | ||||
| 
 | ||||
|         assert functype == 'context' | ||||
|         msg = await recv_chan.receive() | ||||
| 
 | ||||
|         try: | ||||
|             # the "first" value here is delivered by the callee's | ||||
|             # ``Context.started()`` call. | ||||
|             first = msg['started'] | ||||
| 
 | ||||
|         except KeyError: | ||||
|             assert msg.get('cid'), ("Received internal error at context?") | ||||
| 
 | ||||
|             if msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self.channel) | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         _err: Optional[BaseException] = None | ||||
|         # deliver context instance and .started() msg value in open tuple. | ||||
|         try: | ||||
|             async with trio.open_nursery() as scope_nursery: | ||||
|                 ctx = Context( | ||||
|                     self.channel, | ||||
|                     cid, | ||||
|                     _portal=self, | ||||
|                     _recv_chan=recv_chan, | ||||
|                     _scope_nursery=scope_nursery, | ||||
|                 ) | ||||
| 
 | ||||
|                 # pairs with handling in ``Actor._push_result()`` | ||||
|                 # recv_chan._ctx = ctx | ||||
| 
 | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|         except ContextCancelled as err: | ||||
|             _err = err | ||||
|             if not ctx._cancel_called: | ||||
|                 # context was cancelled at the far end but was | ||||
|                 # not part of this end requesting that cancel | ||||
|                 # so raise for the local task to respond and handle. | ||||
|                 raise | ||||
| 
 | ||||
|             # if the context was cancelled by client code | ||||
|             # then we don't need to raise since user code | ||||
|             # is expecting this and the block should exit. | ||||
|             else: | ||||
|                 log.debug(f'Context {ctx} cancelled gracefully') | ||||
| 
 | ||||
|         except ( | ||||
|             trio.Cancelled, | ||||
|             trio.MultiError, | ||||
|             Exception, | ||||
|         ) as err: | ||||
|             _err = err | ||||
|             # the context cancels itself on any cancel | ||||
|             # causing error. | ||||
|             log.error(f'Context {ctx} sending cancel to far end') | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await ctx.cancel() | ||||
|             raise | ||||
| 
 | ||||
|         finally: | ||||
|             result = await ctx.result() | ||||
| 
 | ||||
|             # though it should be impossible for any tasks | ||||
|             # operating *in* this scope to have survived | ||||
|             # we tear down the runtime feeder chan last | ||||
|             # to avoid premature stream clobbers. | ||||
|             if recv_chan is not None: | ||||
|                 await recv_chan.aclose() | ||||
| 
 | ||||
|             if _err: | ||||
|                 if ctx._cancel_called: | ||||
|                     log.warning( | ||||
|                         f'Context {fn_name} cancelled by caller with\n{_err}' | ||||
|                     ) | ||||
|                 elif _err is not None: | ||||
|                     log.warning( | ||||
|                         f'Context {fn_name} cancelled by callee with\n{_err}' | ||||
|                     ) | ||||
|             else: | ||||
|                 log.info( | ||||
|                     f'Context {fn_name} returned ' | ||||
|                     f'value from callee `{result}`' | ||||
|                 ) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
|  | @ -360,10 +484,12 @@ class LocalPortal: | |||
| 
 | ||||
| @asynccontextmanager | ||||
| async def open_portal( | ||||
| 
 | ||||
|     channel: Channel, | ||||
|     nursery: Optional[trio.Nursery] = None, | ||||
|     start_msg_loop: bool = True, | ||||
|     shield: bool = False, | ||||
| 
 | ||||
| ) -> AsyncGenerator[Portal, None]: | ||||
|     """Open a ``Portal`` through the provided ``channel``. | ||||
| 
 | ||||
|  | @ -374,6 +500,7 @@ async def open_portal( | |||
|     was_connected = False | ||||
| 
 | ||||
|     async with maybe_open_nursery(nursery, shield=shield) as nursery: | ||||
| 
 | ||||
|         if not channel.connected(): | ||||
|             await channel.connect() | ||||
|             was_connected = True | ||||
|  | @ -395,12 +522,14 @@ async def open_portal( | |||
|         portal = Portal(channel) | ||||
|         try: | ||||
|             yield portal | ||||
| 
 | ||||
|         finally: | ||||
|             await portal.aclose() | ||||
| 
 | ||||
|             if was_connected: | ||||
|                 # cancel remote channel-msg loop | ||||
|                 # gracefully signal remote channel-msg loop | ||||
|                 await channel.send(None) | ||||
|                 # await channel.aclose() | ||||
| 
 | ||||
|             # cancel background msg loop task | ||||
|             if msg_loop_cs: | ||||
|  |  | |||
|  | @ -86,6 +86,9 @@ async def open_root_actor( | |||
|         # for use of ``await tractor.breakpoint()`` | ||||
|         enable_modules.append('tractor._debug') | ||||
| 
 | ||||
|         if loglevel is None: | ||||
|             loglevel = 'pdb' | ||||
| 
 | ||||
|     elif debug_mode: | ||||
|         raise RuntimeError( | ||||
|             "Debug mode is only supported for the `trio` backend!" | ||||
|  | @ -179,8 +182,7 @@ async def open_root_actor( | |||
| 
 | ||||
|             finally: | ||||
|                 logger.info("Shutting down root actor") | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await actor.cancel() | ||||
|                 await actor.cancel() | ||||
|     finally: | ||||
|         _state._current_actor = None | ||||
|         logger.info("Root actor terminated") | ||||
|  |  | |||
|  | @ -22,7 +22,10 @@ from multiprocessing import forkserver  # type: ignore | |||
| from typing import Tuple | ||||
| 
 | ||||
| from . import _forkserver_override | ||||
| from ._state import current_actor, is_main_process | ||||
| from ._state import ( | ||||
|     current_actor, | ||||
|     is_main_process, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from ._portal import Portal | ||||
| from ._actor import Actor, ActorFailure | ||||
|  | @ -149,6 +152,27 @@ async def cancel_on_completion( | |||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| async def do_hard_kill( | ||||
|     proc: trio.Process, | ||||
| ) -> None: | ||||
|     # NOTE: this timeout used to do nothing since we were shielding | ||||
|     # the ``.wait()`` inside ``new_proc()`` which will pretty much | ||||
|     # never release until the process exits, now it acts as | ||||
|     # a hard-kill time ultimatum. | ||||
|     with trio.move_on_after(3) as cs: | ||||
| 
 | ||||
|         # NOTE: This ``__aexit__()`` shields internally. | ||||
|         async with proc:  # calls ``trio.Process.aclose()`` | ||||
|             log.debug(f"Terminating {proc}") | ||||
| 
 | ||||
|     if cs.cancelled_caught: | ||||
|         # XXX: should pretty much never get here unless we have | ||||
|         # to move the bits from ``proc.__aexit__()`` out and | ||||
|         # into here. | ||||
|         log.critical(f"HARD KILLING {proc}") | ||||
|         proc.kill() | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def spawn_subactor( | ||||
|     subactor: 'Actor', | ||||
|  | @ -180,26 +204,15 @@ async def spawn_subactor( | |||
|     proc = await trio.open_process(spawn_cmd) | ||||
|     try: | ||||
|         yield proc | ||||
| 
 | ||||
|     finally: | ||||
| 
 | ||||
|         # XXX: do this **after** cancellation/tearfown | ||||
|         # to avoid killing the process too early | ||||
|         # since trio does this internally on ``__aexit__()`` | ||||
| 
 | ||||
|         # NOTE: we always "shield" join sub procs in | ||||
|         # the outer scope since no actor zombies are | ||||
|         # ever allowed. This ``__aexit__()`` also shields | ||||
|         # internally. | ||||
|         log.debug(f"Attempting to kill {proc}") | ||||
| 
 | ||||
|         # NOTE: this timeout effectively does nothing right now since | ||||
|         # we are shielding the ``.wait()`` inside ``new_proc()`` which | ||||
|         # will pretty much never release until the process exits. | ||||
|         with trio.move_on_after(3) as cs: | ||||
|             async with proc: | ||||
|                 log.debug(f"Terminating {proc}") | ||||
|         if cs.cancelled_caught: | ||||
|             log.critical(f"HARD KILLING {proc}") | ||||
|             proc.kill() | ||||
|         await do_hard_kill(proc) | ||||
| 
 | ||||
| 
 | ||||
| async def new_proc( | ||||
|  | @ -212,7 +225,6 @@ async def new_proc( | |||
|     parent_addr: Tuple[str, int], | ||||
|     _runtime_vars: Dict[str, Any],  # serialized and sent to _child | ||||
|     *, | ||||
|     use_trio_run_in_process: bool = False, | ||||
|     task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED | ||||
| ) -> None: | ||||
|     """Create a new ``multiprocessing.Process`` using the | ||||
|  | @ -223,7 +235,7 @@ async def new_proc( | |||
|     # mark the new actor with the global spawn method | ||||
|     subactor._spawn_method = _spawn_method | ||||
| 
 | ||||
|     if use_trio_run_in_process or _spawn_method == 'trio': | ||||
|     if _spawn_method == 'trio': | ||||
|         async with trio.open_nursery() as nursery: | ||||
|             async with spawn_subactor( | ||||
|                 subactor, | ||||
|  | @ -277,9 +289,14 @@ async def new_proc( | |||
|                 # reaping more stringently without the shield | ||||
|                 # we used to have below... | ||||
| 
 | ||||
|                 # always "hard" join sub procs: | ||||
|                 # no actor zombies allowed | ||||
|                 # with trio.CancelScope(shield=True): | ||||
|                 # async with proc: | ||||
| 
 | ||||
|                 # Always "hard" join sub procs since no actor zombies | ||||
|                 # are allowed! | ||||
| 
 | ||||
|                 # this is a "light" (cancellable) join, the hard join is | ||||
|                 # in the enclosing scope (see above). | ||||
|                 await proc.wait() | ||||
| 
 | ||||
|             log.debug(f"Joined {proc}") | ||||
|  | @ -320,7 +337,6 @@ async def mp_new_proc( | |||
|     parent_addr: Tuple[str, int], | ||||
|     _runtime_vars: Dict[str, Any],  # serialized and sent to _child | ||||
|     *, | ||||
|     use_trio_run_in_process: bool = False, | ||||
|     task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED | ||||
| 
 | ||||
| ) -> None: | ||||
|  |  | |||
|  | @ -1,41 +1,312 @@ | |||
| """ | ||||
| Message stream types and APIs. | ||||
| 
 | ||||
| """ | ||||
| import inspect | ||||
| from contextlib import contextmanager  # , asynccontextmanager | ||||
| from contextlib import contextmanager, asynccontextmanager | ||||
| from dataclasses import dataclass | ||||
| from typing import Any, Iterator, Optional | ||||
| from typing import ( | ||||
|     Any, Iterator, Optional, Callable, | ||||
|     AsyncGenerator, Dict, | ||||
| ) | ||||
| 
 | ||||
| import warnings | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| from ._ipc import Channel | ||||
| from ._exceptions import unpack_error | ||||
| from ._exceptions import unpack_error, ContextCancelled | ||||
| from ._state import current_actor | ||||
| from .log import get_logger | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass(frozen=True) | ||||
| class Context: | ||||
|     """An IAC (inter-actor communication) context. | ||||
| # TODO: generic typing like trio's receive channel | ||||
| # but with msgspec messages? | ||||
| # class ReceiveChannel(AsyncResource, Generic[ReceiveType]): | ||||
| 
 | ||||
|     Allows maintaining task or protocol specific state between communicating | ||||
|     actors. A unique context is created on the receiving end for every request | ||||
|     to a remote actor. | ||||
| 
 | ||||
|     A context can be cancelled and (eventually) restarted from | ||||
|     either side of the underlying IPC channel. | ||||
| class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||
|     """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with | ||||
|     special behaviour for signalling stream termination across an | ||||
|     inter-actor ``Channel``. This is the type returned to a local task | ||||
|     which invoked a remote streaming function using `Portal.run()`. | ||||
| 
 | ||||
|     A context can be used to open task oriented message streams. | ||||
|     Termination rules: | ||||
| 
 | ||||
|     - if the local task signals stop iteration a cancel signal is | ||||
|       relayed to the remote task indicating to stop streaming | ||||
|     - if the remote task signals the end of a stream, raise | ||||
|       a ``StopAsyncIteration`` to terminate the local ``async for`` | ||||
| 
 | ||||
|     """ | ||||
|     def __init__( | ||||
|         self, | ||||
|         ctx: 'Context',  # typing: ignore # noqa | ||||
|         rx_chan: trio.abc.ReceiveChannel, | ||||
|         shield: bool = False, | ||||
|     ) -> None: | ||||
|         self._ctx = ctx | ||||
|         self._rx_chan = rx_chan | ||||
|         self._shielded = shield | ||||
| 
 | ||||
|         # flag to denote end of stream | ||||
|         self._eoc: bool = False | ||||
| 
 | ||||
|     # delegate directly to underlying mem channel | ||||
|     def receive_nowait(self): | ||||
|         msg = self._rx_chan.receive_nowait() | ||||
|         return msg['yield'] | ||||
| 
 | ||||
|     async def receive(self): | ||||
|         # see ``.aclose()`` for notes on the old behaviour prior to | ||||
|         # introducing this | ||||
|         if self._eoc: | ||||
|             raise trio.EndOfChannel | ||||
| 
 | ||||
|         try: | ||||
| 
 | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
| 
 | ||||
|         except KeyError: | ||||
|             # internal error should never get here | ||||
|             assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|             # TODO: handle 2 cases with 3.10 match syntax | ||||
|             # - 'stop' | ||||
|             # - 'error' | ||||
|             # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|             if msg.get('stop'): | ||||
|                 log.debug(f"{self} was stopped at remote end") | ||||
| 
 | ||||
|                 # # when the send is closed we assume the stream has | ||||
|                 # # terminated and signal this local iterator to stop | ||||
|                 # await self.aclose() | ||||
| 
 | ||||
|                 # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|                 # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|                 # block below it will trigger ``.aclose()``. | ||||
|                 raise trio.EndOfChannel | ||||
| 
 | ||||
|             # TODO: test that shows stream raising an expected error!!! | ||||
|             elif msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self._ctx.chan) | ||||
| 
 | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         except ( | ||||
|             trio.ClosedResourceError,  # by self._rx_chan | ||||
|             trio.EndOfChannel,  # by self._rx_chan or `stop` msg from far end | ||||
|             trio.Cancelled,  # by local cancellation | ||||
|         ): | ||||
|             # XXX: we close the stream on any of these error conditions: | ||||
| 
 | ||||
|             # a ``ClosedResourceError`` indicates that the internal | ||||
|             # feeder memory receive channel was closed likely by the | ||||
|             # runtime after the associated transport-channel | ||||
|             # disconnected or broke. | ||||
| 
 | ||||
|             # an ``EndOfChannel`` indicates either the internal recv | ||||
|             # memchan exhausted **or** we raisesd it just above after | ||||
|             # receiving a `stop` message from the far end of the stream. | ||||
| 
 | ||||
|             # Previously this was triggered by calling ``.aclose()`` on | ||||
|             # the send side of the channel inside | ||||
|             # ``Actor._push_result()`` (should still be commented code | ||||
|             # there - which should eventually get removed), but now the | ||||
|             # 'stop' message handling has been put just above. | ||||
| 
 | ||||
|             # TODO: Locally, we want to close this stream gracefully, by | ||||
|             # terminating any local consumers tasks deterministically. | ||||
|             # One we have broadcast support, we **don't** want to be | ||||
|             # closing this stream and not flushing a final value to | ||||
|             # remaining (clone) consumers who may not have been | ||||
|             # scheduled to receive it yet. | ||||
| 
 | ||||
|             # when the send is closed we assume the stream has | ||||
|             # terminated and signal this local iterator to stop | ||||
|             await self.aclose() | ||||
| 
 | ||||
|             raise  # propagate | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def shield( | ||||
|         self | ||||
|     ) -> Iterator['ReceiveMsgStream']:  # noqa | ||||
|         """Shield this stream's underlying channel such that a local consumer task | ||||
|         can be cancelled (and possibly restarted) using ``trio.Cancelled``. | ||||
| 
 | ||||
|         Note that here, "shielding" here guards against relaying | ||||
|         a ``'stop'`` message to the far end of the stream thus keeping | ||||
|         the stream machinery active and ready for further use, it does | ||||
|         not have anything to do with an internal ``trio.CancelScope``. | ||||
| 
 | ||||
|         """ | ||||
|         self._shielded = True | ||||
|         yield self | ||||
|         self._shielded = False | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         """Cancel associated remote actor task and local memory channel | ||||
|         on close. | ||||
| 
 | ||||
|         """ | ||||
|         # XXX: keep proper adherance to trio's `.aclose()` semantics: | ||||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: | ||||
|             log.warning(f"{self} is already closed") | ||||
| 
 | ||||
|             # this stream has already been closed so silently succeed as | ||||
|             # per ``trio.AsyncResource`` semantics. | ||||
|             # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|             return | ||||
| 
 | ||||
|         # TODO: broadcasting to multiple consumers | ||||
|         # stats = rx_chan.statistics() | ||||
|         # if stats.open_receive_channels > 1: | ||||
|         #     # if we've been cloned don't kill the stream | ||||
|         #     log.debug( | ||||
|         #       "there are still consumers running keeping stream alive") | ||||
|         #     return | ||||
| 
 | ||||
|         if self._shielded: | ||||
|             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||
|             return | ||||
| 
 | ||||
|         # XXX: This must be set **AFTER** the shielded test above! | ||||
|         self._eoc = True | ||||
| 
 | ||||
|         # NOTE: this is super subtle IPC messaging stuff: | ||||
|         # Relay stop iteration to far end **iff** we're | ||||
|         # in bidirectional mode. If we're only streaming | ||||
|         # *from* one side then that side **won't** have an | ||||
|         # entry in `Actor._cids2qs` (maybe it should though?). | ||||
|         # So any `yield` or `stop` msgs sent from the caller side | ||||
|         # will cause key errors on the callee side since there is | ||||
|         # no entry for a local feeder mem chan since the callee task | ||||
|         # isn't expecting messages to be sent by the caller. | ||||
|         # Thus, we must check that this context DOES NOT | ||||
|         # have a portal reference to ensure this is indeed the callee | ||||
|         # side and can relay a 'stop'. | ||||
| 
 | ||||
|         # In the bidirectional case, `Context.open_stream()` will create | ||||
|         # the `Actor._cids2qs` entry from a call to | ||||
|         # `Actor.get_memchans()` and will send the stop message in | ||||
|         # ``__aexit__()`` on teardown so it **does not** need to be | ||||
|         # called here. | ||||
|         if not self._ctx._portal: | ||||
|             try: | ||||
|                 # only for 2 way streams can we can send | ||||
|                 # stop from the caller side | ||||
|                 await self._ctx.send_stop() | ||||
| 
 | ||||
|             except ( | ||||
|                 trio.BrokenResourceError, | ||||
|                 trio.ClosedResourceError | ||||
|             ): | ||||
|                 # the underlying channel may already have been pulled | ||||
|                 # in which case our stop message is meaningless since | ||||
|                 # it can't traverse the transport. | ||||
|                 log.debug(f'Channel for {self} was already closed') | ||||
| 
 | ||||
|         # close the local mem chan ``self._rx_chan`` ??!? | ||||
| 
 | ||||
|         # DEFINITELY NOT if we're a bi-dir ``MsgStream``! | ||||
|         # BECAUSE this same core-msg-loop mem recv-chan is used to deliver | ||||
|         # the potential final result from the surrounding inter-actor | ||||
|         # `Context` so we don't want to close it until that context has | ||||
|         # run to completion. | ||||
| 
 | ||||
|         # XXX: Notes on old behaviour: | ||||
|         # await rx_chan.aclose() | ||||
| 
 | ||||
|         # In the receive-only case, ``Portal.open_stream_from()`` used | ||||
|         # to rely on this call explicitly on teardown such that a new | ||||
|         # call to ``.receive()`` after ``rx_chan`` had been closed, would | ||||
|         # result in us raising a ``trio.EndOfChannel`` (since we | ||||
|         # remapped the ``trio.ClosedResourceError`). However, now if for some | ||||
|         # reason the stream's consumer code tries to manually receive a new | ||||
|         # value before ``.aclose()`` is called **but** the far end has | ||||
|         # stopped `.receive()` **must** raise ``trio.EndofChannel`` in | ||||
|         # order to avoid an infinite hang on ``.__anext__()``; this is | ||||
|         # why we added ``self._eoc`` to denote stream closure indepedent | ||||
|         # of ``rx_chan``. | ||||
| 
 | ||||
|         # In theory we could still use this old method and close the | ||||
|         # underlying msg-loop mem chan as above and then **not** check | ||||
|         # for ``self._eoc`` in ``.receive()`` (if for some reason we | ||||
|         # think that check is a bottle neck - not likely) **but** then | ||||
|         # we would need to map the resulting | ||||
|         # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in | ||||
|         # ``.receive()`` (as it originally was before bi-dir streaming | ||||
|         # support) in order to trigger stream closure. The old behaviour | ||||
|         # is arguably more confusing since we lose detection of the | ||||
|         # runtime's closure of ``rx_chan`` in the case where we may | ||||
|         # still need to consume msgs that are "in transit" from the far | ||||
|         # end (eg. for ``Context.result()``). | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||
|     """ | ||||
|     Bidirectional message stream for use within an inter-actor actor | ||||
|     ``Context```. | ||||
| 
 | ||||
|     """ | ||||
|     async def send( | ||||
|         self, | ||||
|         data: Any | ||||
|     ) -> None: | ||||
|         '''Send a message over this stream to the far end. | ||||
| 
 | ||||
|         ''' | ||||
|         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     def clone(self): | ||||
|         """Clone this receive channel allowing for multi-task | ||||
|         consumption from the same channel. | ||||
| 
 | ||||
|         """ | ||||
|         return MsgStream( | ||||
|             self._ctx, | ||||
|             self._rx_chan.clone(), | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class Context: | ||||
|     '''An inter-actor task communication context. | ||||
| 
 | ||||
|     Allows maintaining task or protocol specific state between | ||||
|     2 communicating actor tasks. A unique context is created on the | ||||
|     callee side/end for every request to a remote actor from a portal. | ||||
| 
 | ||||
|     A context can be cancelled and (possibly eventually restarted) from | ||||
|     either side of the underlying IPC channel. | ||||
| 
 | ||||
|     A context can be used to open task oriented message streams and can | ||||
|     be thought of as an IPC aware inter-actor cancel scope. | ||||
| 
 | ||||
|     ''' | ||||
|     chan: Channel | ||||
|     cid: str | ||||
| 
 | ||||
|     # only set on the caller side | ||||
|     _portal: Optional['Portal'] = None    # type: ignore # noqa | ||||
|     _recv_chan: Optional[trio.MemoryReceiveChannel] = None | ||||
|     _result: Optional[Any] = False | ||||
|     _cancel_called: bool = False | ||||
| 
 | ||||
|     # only set on the callee side | ||||
|     _cancel_scope: Optional[trio.CancelScope] = None | ||||
|     _scope_nursery: Optional[trio.Nursery] = None | ||||
| 
 | ||||
|     async def send_yield(self, data: Any) -> None: | ||||
| 
 | ||||
|  | @ -50,53 +321,229 @@ class Context: | |||
|     async def send_stop(self) -> None: | ||||
|         await self.chan.send({'stop': True, 'cid': self.cid}) | ||||
| 
 | ||||
|     def _error_from_remote_msg( | ||||
|         self, | ||||
|         msg: Dict[str, Any], | ||||
| 
 | ||||
|     ) -> None: | ||||
|         '''Unpack and raise a msg error into the local scope | ||||
|         nursery for this context. | ||||
| 
 | ||||
|         Acts as a form of "relay" for a remote error raised | ||||
|         in the corresponding remote callee task. | ||||
|         ''' | ||||
|         assert self._scope_nursery | ||||
| 
 | ||||
|         async def raiser(): | ||||
|             raise unpack_error(msg, self.chan) | ||||
| 
 | ||||
|         self._scope_nursery.start_soon(raiser) | ||||
| 
 | ||||
|     async def cancel(self) -> None: | ||||
|         """Cancel this inter-actor-task context. | ||||
|         '''Cancel this inter-actor-task context. | ||||
| 
 | ||||
|         Request that the far side cancel it's current linked context, | ||||
|         timeout quickly to sidestep 2-generals... | ||||
|         Timeout quickly in an attempt to sidestep 2-generals... | ||||
| 
 | ||||
|         """ | ||||
|         assert self._portal, ( | ||||
|             "No portal found, this is likely a callee side context") | ||||
|         ''' | ||||
|         side = 'caller' if self._portal else 'callee' | ||||
| 
 | ||||
|         cid = self.cid | ||||
|         with trio.move_on_after(0.5) as cs: | ||||
|             cs.shield = True | ||||
|             log.warning( | ||||
|                 f"Cancelling stream {cid} to " | ||||
|                 f"{self._portal.channel.uid}") | ||||
|         log.warning(f'Cancelling {side} side of context to {self.chan}') | ||||
| 
 | ||||
|             # NOTE: we're telling the far end actor to cancel a task | ||||
|             # corresponding to *this actor*. The far end local channel | ||||
|             # instance is passed to `Actor._cancel_task()` implicitly. | ||||
|             await self._portal.run_from_ns('self', '_cancel_task', cid=cid) | ||||
|         self._cancel_called = True | ||||
| 
 | ||||
|         if cs.cancelled_caught: | ||||
|             # XXX: there's no way to know if the remote task was indeed | ||||
|             # cancelled in the case where the connection is broken or | ||||
|             # some other network error occurred. | ||||
|             if not self._portal.channel.connected(): | ||||
|         if side == 'caller': | ||||
|             if not self._portal: | ||||
|                 raise RuntimeError( | ||||
|                     "No portal found, this is likely a callee side context" | ||||
|                 ) | ||||
| 
 | ||||
|             cid = self.cid | ||||
|             with trio.move_on_after(0.5) as cs: | ||||
|                 cs.shield = True | ||||
|                 log.warning( | ||||
|                     "May have failed to cancel remote task " | ||||
|                     f"{cid} for {self._portal.channel.uid}") | ||||
|                     f"Cancelling stream {cid} to " | ||||
|                     f"{self._portal.channel.uid}") | ||||
| 
 | ||||
|                 # NOTE: we're telling the far end actor to cancel a task | ||||
|                 # corresponding to *this actor*. The far end local channel | ||||
|                 # instance is passed to `Actor._cancel_task()` implicitly. | ||||
|                 await self._portal.run_from_ns('self', '_cancel_task', cid=cid) | ||||
| 
 | ||||
|             if cs.cancelled_caught: | ||||
|                 # XXX: there's no way to know if the remote task was indeed | ||||
|                 # cancelled in the case where the connection is broken or | ||||
|                 # some other network error occurred. | ||||
|                 # if not self._portal.channel.connected(): | ||||
|                 if not self.chan.connected(): | ||||
|                     log.warning( | ||||
|                         "May have failed to cancel remote task " | ||||
|                         f"{cid} for {self._portal.channel.uid}") | ||||
|         else: | ||||
|             # callee side remote task | ||||
| 
 | ||||
|             # TODO: should we have an explicit cancel message | ||||
|             # or is relaying the local `trio.Cancelled` as an | ||||
|             # {'error': trio.Cancelled, cid: "blah"} enough? | ||||
|             # This probably gets into the discussion in | ||||
|             # https://github.com/goodboy/tractor/issues/36 | ||||
|             assert self._scope_nursery | ||||
|             self._scope_nursery.cancel_scope.cancel() | ||||
| 
 | ||||
|         if self._recv_chan: | ||||
|             await self._recv_chan.aclose() | ||||
| 
 | ||||
|     @asynccontextmanager | ||||
|     async def open_stream( | ||||
| 
 | ||||
|         self, | ||||
|         shield: bool = False, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[MsgStream, None]: | ||||
|         '''Open a ``MsgStream``, a bi-directional stream connected to the | ||||
|         cross-actor (far end) task for this ``Context``. | ||||
| 
 | ||||
|         This context manager must be entered on both the caller and | ||||
|         callee for the stream to logically be considered "connected". | ||||
| 
 | ||||
|         A ``MsgStream`` is currently "one-shot" use, meaning if you | ||||
|         close it you can not "re-open" it for streaming and instead you | ||||
|         must re-establish a new surrounding ``Context`` using | ||||
|         ``Portal.open_context()``.  In the future this may change but | ||||
|         currently there seems to be no obvious reason to support | ||||
|         "re-opening": | ||||
|             - pausing a stream can be done with a message. | ||||
|             - task errors will normally require a restart of the entire | ||||
|               scope of the inter-actor task context due to the nature of | ||||
|               ``trio``'s cancellation system. | ||||
| 
 | ||||
|         ''' | ||||
|         actor = current_actor() | ||||
| 
 | ||||
|         # here we create a mem chan that corresponds to the | ||||
|         # far end caller / callee. | ||||
| 
 | ||||
|         # NOTE: in one way streaming this only happens on the | ||||
|         # caller side inside `Actor.send_cmd()` so if you try | ||||
|         # to send a stop from the caller to the callee in the | ||||
|         # single-direction-stream case you'll get a lookup error | ||||
|         # currently. | ||||
|         _, recv_chan = actor.get_memchans( | ||||
|             self.chan.uid, | ||||
|             self.cid | ||||
|         ) | ||||
| 
 | ||||
|         # Likewise if the surrounding context has been cancelled we error here | ||||
|         # since it likely means the surrounding block was exited or | ||||
|         # killed | ||||
| 
 | ||||
|         if self._cancel_called: | ||||
|             task = trio.lowlevel.current_task().name | ||||
|             raise ContextCancelled( | ||||
|                 f'Context around {actor.uid[0]}:{task} was already cancelled!' | ||||
|             ) | ||||
| 
 | ||||
|         # XXX: If the underlying channel feeder receive mem chan has | ||||
|         # been closed then likely client code has already exited | ||||
|         # a ``.open_stream()`` block prior or there was some other | ||||
|         # unanticipated error or cancellation from ``trio``. | ||||
| 
 | ||||
|         if recv_chan._closed: | ||||
|             raise trio.ClosedResourceError( | ||||
|                 'The underlying channel for this stream was already closed!?') | ||||
| 
 | ||||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=recv_chan, | ||||
|             shield=shield, | ||||
|         ) as rchan: | ||||
| 
 | ||||
|             if self._portal: | ||||
|                 self._portal._streams.add(rchan) | ||||
| 
 | ||||
|             try: | ||||
|                 # ensure we aren't cancelled before delivering | ||||
|                 # the stream | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield rchan | ||||
| 
 | ||||
|             except trio.EndOfChannel: | ||||
|                 # likely the far end sent us a 'stop' message to | ||||
|                 # terminate the stream. | ||||
|                 raise | ||||
| 
 | ||||
|             else: | ||||
|                 # XXX: Make the stream "one-shot use".  On exit, signal | ||||
|                 # ``trio.EndOfChannel``/``StopAsyncIteration`` to the | ||||
|                 # far end. | ||||
|                 await self.send_stop() | ||||
| 
 | ||||
|             finally: | ||||
|                 if self._portal: | ||||
|                     self._portal._streams.remove(rchan) | ||||
| 
 | ||||
|     async def result(self) -> Any: | ||||
|         '''From a caller side, wait for and return the final result from | ||||
|         the callee side task. | ||||
| 
 | ||||
|         ''' | ||||
|         assert self._portal, "Context.result() can not be called from callee!" | ||||
|         assert self._recv_chan | ||||
| 
 | ||||
|         if self._result is False: | ||||
| 
 | ||||
|             if not self._recv_chan._closed:  # type: ignore | ||||
| 
 | ||||
|                 # wait for a final context result consuming | ||||
|                 # and discarding any bi dir stream msgs still | ||||
|                 # in transit from the far end. | ||||
|                 while True: | ||||
| 
 | ||||
|                     msg = await self._recv_chan.receive() | ||||
|                     try: | ||||
|                         self._result = msg['return'] | ||||
|                         break | ||||
|                     except KeyError: | ||||
| 
 | ||||
|                         if 'yield' in msg: | ||||
|                             # far end task is still streaming to us.. | ||||
|                             log.warning(f'Remote stream deliverd {msg}') | ||||
|                             # do disard | ||||
|                             continue | ||||
| 
 | ||||
|                         elif 'stop' in msg: | ||||
|                             log.debug('Remote stream terminated') | ||||
|                             continue | ||||
| 
 | ||||
|                         # internal error should never get here | ||||
|                         assert msg.get('cid'), ( | ||||
|                             "Received internal error at portal?") | ||||
|                         raise unpack_error(msg, self._portal.channel) | ||||
| 
 | ||||
|         return self._result | ||||
| 
 | ||||
|     async def started(self, value: Optional[Any] = None) -> None: | ||||
| 
 | ||||
|         if self._portal: | ||||
|             raise RuntimeError( | ||||
|                 f"Caller side context {self} can not call started!") | ||||
| 
 | ||||
|         await self.chan.send({'started': value, 'cid': self.cid}) | ||||
| 
 | ||||
|     # TODO: do we need a restart api? | ||||
|     # async def restart(self) -> None: | ||||
|     #     # TODO | ||||
|     #     pass | ||||
| 
 | ||||
|     # @asynccontextmanager | ||||
|     # async def open_stream( | ||||
|     #     self, | ||||
|     # ) -> AsyncContextManager: | ||||
|     #     # TODO | ||||
|     #     pass | ||||
| 
 | ||||
| 
 | ||||
| def stream(func): | ||||
| def stream(func: Callable) -> Callable: | ||||
|     """Mark an async function as a streaming routine with ``@stream``. | ||||
| 
 | ||||
|     """ | ||||
|     func._tractor_stream_function = True | ||||
|     # annotate | ||||
|     # TODO: apply whatever solution ``mypy`` ends up picking for this: | ||||
|     # https://github.com/python/mypy/issues/2087#issuecomment-769266912 | ||||
|     func._tractor_stream_function = True  # type: ignore | ||||
| 
 | ||||
|     sig = inspect.signature(func) | ||||
|     params = sig.parameters | ||||
|     if 'stream' not in params and 'ctx' in params: | ||||
|  | @ -114,147 +561,26 @@ def stream(func): | |||
|     ): | ||||
|         raise TypeError( | ||||
|             "The first argument to the stream function " | ||||
|             f"{func.__name__} must be `ctx: tractor.Context`" | ||||
|             f"{func.__name__} must be `ctx: tractor.Context` " | ||||
|             "(Or ``to_trio`` if using ``asyncio`` in guest mode)." | ||||
|         ) | ||||
|     return func | ||||
| 
 | ||||
| 
 | ||||
| class ReceiveMsgStream(trio.abc.ReceiveChannel): | ||||
|     """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with | ||||
|     special behaviour for signalling stream termination across an | ||||
|     inter-actor ``Channel``. This is the type returned to a local task | ||||
|     which invoked a remote streaming function using `Portal.run()`. | ||||
| 
 | ||||
|     Termination rules: | ||||
|     - if the local task signals stop iteration a cancel signal is | ||||
|       relayed to the remote task indicating to stop streaming | ||||
|     - if the remote task signals the end of a stream, raise a | ||||
|       ``StopAsyncIteration`` to terminate the local ``async for`` | ||||
| def context(func: Callable) -> Callable: | ||||
|     """Mark an async function as a streaming routine with ``@context``. | ||||
| 
 | ||||
|     """ | ||||
|     def __init__( | ||||
|         self, | ||||
|         ctx: Context, | ||||
|         rx_chan: trio.abc.ReceiveChannel, | ||||
|         portal: 'Portal',  # type: ignore # noqa | ||||
|     ) -> None: | ||||
|         self._ctx = ctx | ||||
|         self._rx_chan = rx_chan | ||||
|         self._portal = portal | ||||
|         self._shielded = False | ||||
|     # annotate | ||||
|     # TODO: apply whatever solution ``mypy`` ends up picking for this: | ||||
|     # https://github.com/python/mypy/issues/2087#issuecomment-769266912 | ||||
|     func._tractor_context_function = True  # type: ignore | ||||
| 
 | ||||
|     # delegate directly to underlying mem channel | ||||
|     def receive_nowait(self): | ||||
|         return self._rx_chan.receive_nowait() | ||||
| 
 | ||||
|     async def receive(self): | ||||
|         try: | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
| 
 | ||||
|         except KeyError: | ||||
|             # internal error should never get here | ||||
|             assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|             # TODO: handle 2 cases with 3.10 match syntax | ||||
|             # - 'stop' | ||||
|             # - 'error' | ||||
|             # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|             # TODO: test that shows stream raising an expected error!!! | ||||
|             if msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self._portal.channel) | ||||
| 
 | ||||
|         except (trio.ClosedResourceError, StopAsyncIteration): | ||||
|             # XXX: this indicates that a `stop` message was | ||||
|             # sent by the far side of the underlying channel. | ||||
|             # Currently this is triggered by calling ``.aclose()`` on | ||||
|             # the send side of the channel inside | ||||
|             # ``Actor._push_result()``, but maybe it should be put here? | ||||
|             # to avoid exposing the internal mem chan closing mechanism? | ||||
|             # in theory we could instead do some flushing of the channel | ||||
|             # if needed to ensure all consumers are complete before | ||||
|             # triggering closure too early? | ||||
| 
 | ||||
|             # Locally, we want to close this stream gracefully, by | ||||
|             # terminating any local consumers tasks deterministically. | ||||
|             # We **don't** want to be closing this send channel and not | ||||
|             # relaying a final value to remaining consumers who may not | ||||
|             # have been scheduled to receive it yet? | ||||
| 
 | ||||
|             # lots of testing to do here | ||||
| 
 | ||||
|             # when the send is closed we assume the stream has | ||||
|             # terminated and signal this local iterator to stop | ||||
|             await self.aclose() | ||||
|             raise StopAsyncIteration | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             # relay cancels to the remote task | ||||
|             await self.aclose() | ||||
|             raise | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def shield( | ||||
|         self | ||||
|     ) -> Iterator['ReceiveMsgStream']:  # noqa | ||||
|         """Shield this stream's underlying channel such that a local consumer task | ||||
|         can be cancelled (and possibly restarted) using ``trio.Cancelled``. | ||||
| 
 | ||||
|         """ | ||||
|         self._shielded = True | ||||
|         yield self | ||||
|         self._shielded = False | ||||
| 
 | ||||
|     async def aclose(self): | ||||
|         """Cancel associated remote actor task and local memory channel | ||||
|         on close. | ||||
|         """ | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: | ||||
|             log.warning(f"{self} is already closed") | ||||
|             return | ||||
| 
 | ||||
|         # stats = rx_chan.statistics() | ||||
|         # if stats.open_receive_channels > 1: | ||||
|         #     # if we've been cloned don't kill the stream | ||||
|         #     log.debug( | ||||
|         #       "there are still consumers running keeping stream alive") | ||||
|         #     return | ||||
| 
 | ||||
|         if self._shielded: | ||||
|             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||
|             return | ||||
| 
 | ||||
|         # close the local mem chan | ||||
|         rx_chan.close() | ||||
| 
 | ||||
|         # cancel surrounding IPC context | ||||
|         await self._ctx.cancel() | ||||
| 
 | ||||
|     # TODO: but make it broadcasting to consumers | ||||
|     # def clone(self): | ||||
|     #     """Clone this receive channel allowing for multi-task | ||||
|     #     consumption from the same channel. | ||||
| 
 | ||||
|     #     """ | ||||
|     #     return ReceiveStream( | ||||
|     #         self._cid, | ||||
|     #         self._rx_chan.clone(), | ||||
|     #         self._portal, | ||||
|     #     ) | ||||
| 
 | ||||
| 
 | ||||
| # class MsgStream(ReceiveMsgStream, trio.abc.Channel): | ||||
| #     """ | ||||
| #     Bidirectional message stream for use within an inter-actor actor | ||||
| #     ``Context```. | ||||
| 
 | ||||
| #     """ | ||||
| #     async def send( | ||||
| #         self, | ||||
| #         data: Any | ||||
| #     ) -> None: | ||||
| #         await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) | ||||
|     sig = inspect.signature(func) | ||||
|     params = sig.parameters | ||||
|     if 'ctx' not in params: | ||||
|         raise TypeError( | ||||
|             "The first argument to the context function " | ||||
|             f"{func.__name__} must be `ctx: tractor.Context`" | ||||
|         ) | ||||
|     return func | ||||
|  |  | |||
|  | @ -252,6 +252,12 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|                         f"Waiting on subactors {anursery._children} " | ||||
|                         "to complete" | ||||
|                     ) | ||||
| 
 | ||||
|                     # Last bit before first nursery block ends in the case | ||||
|                     # where we didn't error in the caller's scope | ||||
|                     log.debug("Waiting on all subactors to complete") | ||||
|                     anursery._join_procs.set() | ||||
| 
 | ||||
|                 except BaseException as err: | ||||
|                     # if the caller's scope errored then we activate our | ||||
|                     # one-cancels-all supervisor strategy (don't | ||||
|  | @ -292,11 +298,6 @@ async def _open_and_supervise_one_cancels_all_nursery( | |||
|                     else: | ||||
|                         raise | ||||
| 
 | ||||
|                 # Last bit before first nursery block ends in the case | ||||
|                 # where we didn't error in the caller's scope | ||||
|                 log.debug("Waiting on all subactors to complete") | ||||
|                 anursery._join_procs.set() | ||||
| 
 | ||||
|                 # ria_nursery scope end | ||||
| 
 | ||||
|         # XXX: do we need a `trio.Cancelled` catch here as well? | ||||
|  | @ -357,7 +358,8 @@ async def open_nursery( | |||
|     try: | ||||
|         if actor is None and is_main_process(): | ||||
| 
 | ||||
|             # if we are the parent process start the actor runtime implicitly | ||||
|             # if we are the parent process start the | ||||
|             # actor runtime implicitly | ||||
|             log.info("Starting actor runtime!") | ||||
| 
 | ||||
|             # mark us for teardown on exit | ||||
|  | @ -376,7 +378,6 @@ async def open_nursery( | |||
|             async with _open_and_supervise_one_cancels_all_nursery( | ||||
|                 actor | ||||
|             ) as anursery: | ||||
| 
 | ||||
|                 yield anursery | ||||
| 
 | ||||
|     finally: | ||||
|  |  | |||
|  | @ -26,35 +26,62 @@ LOG_FORMAT = ( | |||
|     " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" | ||||
|     " {reset}{bold_white}{thin_white}{message}" | ||||
| ) | ||||
| 
 | ||||
| DATE_FORMAT = '%b %d %H:%M:%S' | ||||
| 
 | ||||
| LEVELS = { | ||||
|     'GARBAGE': 1, | ||||
|     'TRACE': 5, | ||||
|     'PROFILE': 15, | ||||
|     'RUNTIME': 500, | ||||
|     'QUIET': 1000, | ||||
|     'TRANSPORT': 5, | ||||
|     'RUNTIME': 15, | ||||
|     'PDB': 500, | ||||
| } | ||||
| 
 | ||||
| STD_PALETTE = { | ||||
|     'CRITICAL': 'red', | ||||
|     'ERROR': 'red', | ||||
|     'RUNTIME': 'white', | ||||
|     'PDB': 'white', | ||||
|     'WARNING': 'yellow', | ||||
|     'INFO': 'green', | ||||
|     'RUNTIME': 'white', | ||||
|     'DEBUG': 'white', | ||||
|     'TRACE': 'cyan', | ||||
|     'GARBAGE': 'blue', | ||||
|     'TRANSPORT': 'cyan', | ||||
| } | ||||
| 
 | ||||
| BOLD_PALETTE = { | ||||
|     'bold': { | ||||
|         level: f"bold_{color}" for level, color in STD_PALETTE.items()} | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| class StackLevelAdapter(logging.LoggerAdapter): | ||||
| 
 | ||||
|     def transport( | ||||
|         self, | ||||
|         msg: str, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         return self.log(5, msg) | ||||
| 
 | ||||
|     def runtime( | ||||
|         self, | ||||
|         msg: str, | ||||
|     ) -> None: | ||||
|         return self.log(15, msg) | ||||
| 
 | ||||
|     def pdb( | ||||
|         self, | ||||
|         msg: str, | ||||
|     ) -> None: | ||||
|         return self.log(500, msg) | ||||
| 
 | ||||
| 
 | ||||
| def get_logger( | ||||
| 
 | ||||
|     name: str = None, | ||||
|     _root_name: str = _proj_name, | ||||
| ) -> logging.LoggerAdapter: | ||||
|     '''Return the package log or a sub-log for `name` if provided. | ||||
| 
 | ||||
| ) -> StackLevelAdapter: | ||||
|     '''Return the package log or a sub-logger for ``name`` if provided. | ||||
| 
 | ||||
|     ''' | ||||
|     log = rlog = logging.getLogger(_root_name) | ||||
| 
 | ||||
|  | @ -71,13 +98,14 @@ def get_logger( | |||
| 
 | ||||
|     # add our actor-task aware adapter which will dynamically look up | ||||
|     # the actor and task names at each log emit | ||||
|     logger = logging.LoggerAdapter(log, ActorContextInfo()) | ||||
|     logger = StackLevelAdapter(log, ActorContextInfo()) | ||||
| 
 | ||||
|     # additional levels | ||||
|     for name, val in LEVELS.items(): | ||||
|         logging.addLevelName(val, name) | ||||
|         # ex. create ``logger.trace()`` | ||||
|         setattr(logger, name.lower(), partial(logger.log, val)) | ||||
| 
 | ||||
|         # ensure customs levels exist as methods | ||||
|         assert getattr(logger, name.lower()), f'Logger does not define {name}' | ||||
| 
 | ||||
|     return logger | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,9 +1,13 @@ | |||
| """ | ||||
| Messaging pattern APIs and helpers. | ||||
| 
 | ||||
| NOTE: this module is likely deprecated by the new bi-directional streaming | ||||
| support provided by ``tractor.Context.open_stream()`` and friends. | ||||
| 
 | ||||
| """ | ||||
| import inspect | ||||
| import typing | ||||
| from typing import Dict, Any, Set, Callable | ||||
| from typing import Dict, Any, Set, Callable, List, Tuple | ||||
| from functools import partial | ||||
| from async_generator import aclosing | ||||
| 
 | ||||
|  | @ -20,7 +24,7 @@ log = get_logger('messaging') | |||
| 
 | ||||
| async def fan_out_to_ctxs( | ||||
|     pub_async_gen_func: typing.Callable,  # it's an async gen ... gd mypy | ||||
|     topics2ctxs: Dict[str, set], | ||||
|     topics2ctxs: Dict[str, list], | ||||
|     packetizer: typing.Callable = None, | ||||
| ) -> None: | ||||
|     """Request and fan out quotes to each subscribed actor channel. | ||||
|  | @ -34,24 +38,27 @@ async def fan_out_to_ctxs( | |||
| 
 | ||||
|         async for published in pub_gen: | ||||
| 
 | ||||
|             ctx_payloads: Dict[str, Any] = {} | ||||
|             ctx_payloads: List[Tuple[Context, Any]] = [] | ||||
| 
 | ||||
|             for topic, data in published.items(): | ||||
|                 log.debug(f"publishing {topic, data}") | ||||
| 
 | ||||
|                 # build a new dict packet or invoke provided packetizer | ||||
|                 if packetizer is None: | ||||
|                     packet = {topic: data} | ||||
| 
 | ||||
|                 else: | ||||
|                     packet = packetizer(topic, data) | ||||
|                 for ctx in topics2ctxs.get(topic, set()): | ||||
|                     ctx_payloads.setdefault(ctx, {}).update(packet), | ||||
| 
 | ||||
|                 for ctx in topics2ctxs.get(topic, list()): | ||||
|                     ctx_payloads.append((ctx, packet)) | ||||
| 
 | ||||
|             if not ctx_payloads: | ||||
|                 log.debug(f"Unconsumed values:\n{published}") | ||||
| 
 | ||||
|             # deliver to each subscriber (fan out) | ||||
|             if ctx_payloads: | ||||
|                 for ctx, payload in ctx_payloads.items(): | ||||
|                 for ctx, payload in ctx_payloads: | ||||
|                     try: | ||||
|                         await ctx.send_yield(payload) | ||||
|                     except ( | ||||
|  | @ -60,15 +67,24 @@ async def fan_out_to_ctxs( | |||
|                         ConnectionRefusedError, | ||||
|                     ): | ||||
|                         log.warning(f"{ctx.chan} went down?") | ||||
|                         for ctx_set in topics2ctxs.values(): | ||||
|                             ctx_set.discard(ctx) | ||||
|                         for ctx_list in topics2ctxs.values(): | ||||
|                             try: | ||||
|                                 ctx_list.remove(ctx) | ||||
|                             except ValueError: | ||||
|                                 continue | ||||
| 
 | ||||
|             if not get_topics(): | ||||
|                 log.warning(f"No subscribers left for {pub_gen}") | ||||
|                 break | ||||
| 
 | ||||
| 
 | ||||
| def modify_subs(topics2ctxs, topics, ctx): | ||||
| def modify_subs( | ||||
| 
 | ||||
|     topics2ctxs: Dict[str, List[Context]], | ||||
|     topics: Set[str], | ||||
|     ctx: Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|     """Absolute symbol subscription list for each quote stream. | ||||
| 
 | ||||
|     Effectively a symbol subscription api. | ||||
|  | @ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx): | |||
| 
 | ||||
|     # update map from each symbol to requesting client's chan | ||||
|     for topic in topics: | ||||
|         topics2ctxs.setdefault(topic, set()).add(ctx) | ||||
|         topics2ctxs.setdefault(topic, list()).append(ctx) | ||||
| 
 | ||||
|     # remove any existing symbol subscriptions if symbol is not | ||||
|     # found in ``symbols`` | ||||
|  | @ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx): | |||
|     for topic in filter( | ||||
|         lambda topic: topic not in topics, topics2ctxs.copy() | ||||
|     ): | ||||
|         ctx_set = topics2ctxs.get(topic) | ||||
|         ctx_set.discard(ctx) | ||||
|         ctx_list = topics2ctxs.get(topic) | ||||
|         if ctx_list: | ||||
|             try: | ||||
|                 ctx_list.remove(ctx) | ||||
|             except ValueError: | ||||
|                 pass | ||||
| 
 | ||||
|         if not ctx_set: | ||||
|         if not ctx_list: | ||||
|             # pop empty sets which will trigger bg quoter task termination | ||||
|             topics2ctxs.pop(topic) | ||||
| 
 | ||||
|  | @ -256,7 +276,7 @@ def pub( | |||
|                             respawn = True | ||||
|             finally: | ||||
|                 # remove all subs for this context | ||||
|                 modify_subs(topics2ctxs, (), ctx) | ||||
|                 modify_subs(topics2ctxs, set(), ctx) | ||||
| 
 | ||||
|                 # if there are truly no more subscriptions with this broker | ||||
|                 # drop from broker subs dict | ||||
|  |  | |||
|  | @ -78,7 +78,7 @@ def tractor_test(fn): | |||
| 
 | ||||
|         else: | ||||
|             # use implicit root actor start | ||||
|             main = partial(fn, *args, **kwargs), | ||||
|             main = partial(fn, *args, **kwargs) | ||||
| 
 | ||||
|         return trio.run(main) | ||||
|             # arbiter_addr=arb_addr, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue