From 304ee6ccd248d55f883347e0502195780380782e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:50:41 -0500 Subject: [PATCH] Enable stream backpressure by default, add `MsgStream.ctx: Context` --- tractor/_streaming.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 11ff47d..261b756 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -69,7 +69,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): ''' def __init__( self, - ctx: 'Context', # typing: ignore # noqa + ctx: Context, # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, _broadcaster: Optional[BroadcastReceiver] = None, @@ -82,6 +82,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self._eoc: bool = False self._closed: bool = False + def ctx(self) -> Context: + return self._ctx + # delegate directly to underlying mem channel def receive_nowait(self): msg = self._rx_chan.receive_nowait() @@ -381,7 +384,7 @@ class Context: # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None - _backpressure: bool = False + _backpressure: bool = True async def send_yield(self, data: Any) -> None: