Cache sample step streams per actor

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-31 09:28:22 -04:00
parent bbcce0cab6
commit 1184a4d88e
1 changed files with 31 additions and 14 deletions

View File

@ -355,6 +355,31 @@ async def attach_feed_bus(
bus._subscribers[symbol].remove(sub)
@asynccontextmanager
async def open_sample_step_stream(
portal: tractor.Portal,
delay_s: int,
) -> tractor.ReceiveMsgStream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_ctx(
key=delay_s,
mngr=portal.open_stream_from(
iter_ohlc_periods,
delay_s=delay_s, # must be kwarg
),
) as (cache_hit, istream):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
@dataclass
class Feed:
"""A data feed for client-side interaction with far-process# }}}
@ -371,7 +396,6 @@ class Feed:
stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0
@ -391,20 +415,13 @@ class Feed:
) -> AsyncIterator[int]:
if not self._index_stream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with self._brokerd_portal.open_stream_from(
delay_s = delay_s or self._max_sample_rate
iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate,
) as self._index_stream:
yield self._index_stream
else:
yield self._index_stream
async with open_sample_step_stream(
self._brokerd_portal,
delay_s,
) as istream:
yield istream
async def pause(self) -> None:
await self.stream.send('pause')