From 1e42f584781c9f44edcfa501d2b591ac3c1a1972 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 Aug 2021 18:14:09 -0400 Subject: [PATCH] Add pause/resume feed api, delegate to msg stream for broadcast api --- piker/data/feed.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 715cf9d1..cc9ea883 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -34,7 +34,7 @@ import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor -from tractor import _broadcast +# from tractor import _broadcast from pydantic import BaseModel from ..brokers import get_brokermod @@ -329,17 +329,22 @@ async def attach_feed_bus( subs.append(sub) try: + uid = ctx.chan.uid + fqsn = f'{symbol}.{brokername}' + async for msg in stream: if msg == 'pause': - log.info( - f'Pausing {symbol}.{brokername} feed for {ctx.chan.uid}') - subs.remove(sub) + if sub in subs: + log.info( + f'Pausing {fqsn} feed for {uid}') + subs.remove(sub) elif msg == 'resume': - log.info( - f'Resuming {symbol}.{brokername} feed for {ctx.chan.uid}') - subs.append(sub) + if sub not in subs: + log.info( + f'Resuming {fqsn} feed for {uid}') + subs.append(sub) else: raise ValueError(msg) finally: @@ -401,6 +406,12 @@ class Feed: else: yield self._index_stream + async def pause(self) -> None: + await self.stream.send('pause') + + async def resume(self) -> None: + await self.stream.send('resume') + def sym_to_shm_key( broker: str, @@ -493,16 +504,12 @@ async def open_feed( readonly=True, ) - bstream = _broadcast.broadcast_receiver( - stream, - 2**10, - ) feed = Feed( name=brokername, shm=shm, mod=mod, first_quote=first_quote, - stream=bstream, #brx_stream, + stream=stream, _brokerd_portal=portal, ) ohlc_sample_rates = [] @@ -573,4 +580,4 @@ async def maybe_open_feed( async with feed.stream.subscribe() as bstream: yield feed, bstream else: - yield feed, stream + yield feed, feed.stream