From f7a1f3832f018ddd1f50339491ac1d2ee1fcfdec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:50:41 -0500 Subject: [PATCH] Enable stream backpressure by default, add `MsgStream.ctx: Context` --- tractor/_streaming.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index b112956..ec15272 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -70,7 +70,7 @@ class MsgStream(trio.abc.Channel): ''' def __init__( self, - ctx: 'Context', # typing: ignore # noqa + ctx: Context, # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, _broadcaster: Optional[BroadcastReceiver] = None, @@ -83,6 +83,9 @@ class MsgStream(trio.abc.Channel): self._eoc: bool = False self._closed: bool = False + def ctx(self) -> Context: + return self._ctx + # delegate directly to underlying mem channel def receive_nowait(self): msg = self._rx_chan.receive_nowait() @@ -390,7 +393,7 @@ class Context: # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None - _backpressure: bool = False + _backpressure: bool = True async def send_yield(self, data: Any) -> None: