Compare commits

...

2 Commits

Author SHA1 Message Date
Tyler Goodlet 414a7ecc4c Enable stream backpressure by default, add `MsgStream.ctx: Context` 2022-12-12 18:57:32 -05:00
Tyler Goodlet 55ec4d2a8b Don't unset actor global on root teardown 2022-12-12 15:33:02 -05:00
2 changed files with 5 additions and 3 deletions

View File

@ -243,7 +243,6 @@ async def open_root_actor(
logger.cancel("Shutting down root actor") logger.cancel("Shutting down root actor")
await actor.cancel() await actor.cancel()
finally: finally:
_state._current_actor = None
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")

View File

@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
''' '''
def __init__( def __init__(
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: Context, # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel, rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None, _broadcaster: Optional[BroadcastReceiver] = None,
@ -82,6 +82,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self._eoc: bool = False self._eoc: bool = False
self._closed: bool = False self._closed: bool = False
def ctx(self) -> Context:
return self._ctx
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
msg = self._rx_chan.receive_nowait() msg = self._rx_chan.receive_nowait()
@ -380,7 +383,7 @@ class Context:
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False _backpressure: bool = True
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None: