From 414a7ecc4c80b727cb5cc42fa7c9c843dd90a2de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:50:41 -0500 Subject: [PATCH] Enable stream backpressure by default, add `MsgStream.ctx: Context` --- tractor/_streaming.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index bb99dc5..a4b4742 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): ''' def __init__( self, - ctx: 'Context', # typing: ignore # noqa + ctx: Context, # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, _broadcaster: Optional[BroadcastReceiver] = None, @@ -82,6 +82,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self._eoc: bool = False self._closed: bool = False + def ctx(self) -> Context: + return self._ctx + # delegate directly to underlying mem channel def receive_nowait(self): msg = self._rx_chan.receive_nowait() @@ -380,7 +383,7 @@ class Context: # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None - _backpressure: bool = False + _backpressure: bool = True async def send_yield(self, data: Any) -> None: