diff --git a/piker/data/feed.py b/piker/data/feed.py index 82b7b59b..d26c7f37 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -32,6 +32,7 @@ from typing import ( Callable, Optional, Awaitable, + Sequence, TYPE_CHECKING, Union, ) @@ -627,26 +628,31 @@ async def tsdb_backfill( field_map=marketstore.ohlc_key_map, ) + tsdb_last_frame_start = tsdb_history['Epoch'][0] + + # load as much from storage into shm possible (depends on + # user's shm size settings). while ( shm._first.value > 0 ): - # load as much from storage into shm as space will - # allow according to user's shm size settings. - tsdb_last_frame_start = tsdb_history['Epoch'][0] tsdb_history = await storage.read_ohlcv( fqsn, end=tsdb_last_frame_start, timeframe=timeframe, ) + + next_start = tsdb_history['Epoch'][0] if ( not len(tsdb_history) # empty query # no earlier data detected - or tsdb_history['Epoch'][0] >= tsdb_last_frame_start + or next_start >= tsdb_last_frame_start ): break + else: + tsdb_last_frame_start = next_start prepend_start = shm._first.value to_push = tsdb_history[-prepend_start:] @@ -868,6 +874,9 @@ class Flume(Struct): izero_hist: int = 0 izero_rt: int = 0 throttle_rate: int | None = None + + # TODO: do we need this really if we can pull the `Portal` from + # ``tractor``'s internals? feed: Feed | None = None @property @@ -905,12 +914,15 @@ class Flume(Struct): if not self.feed: raise RuntimeError('This flume is not part of any ``Feed``?') + # TODO: maybe a public (property) API for this in ``tractor``? + portal = self.stream._ctx._portal + # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes async with maybe_open_context( acm_func=partial( - self.feed.portal.open_context, + portal.open_context, iter_ohlc_periods, ), kwargs={'delay_s': delay_s}, @@ -1384,24 +1396,66 @@ class Feed(Struct): similarly allocated shm arrays. ''' - mod: ModuleType - _portal: tractor.Portal + mods: dict[str, ModuleType] = {} + portals: dict[ModuleType, tractor.Portal] = {} flumes: dict[str, Flume] = {} streams: dict[ str, trio.abc.ReceiveChannel[dict[str, Any]], ] = {} - status: dict[str, Any] + + # used for UI to show remote state + status: dict[str, Any] = {} + + @acm + async def open_multi_stream( + self, + brokers: Sequence[str] | None = None, + + ) -> trio.abc.ReceiveChannel: + + if brokers is None: + mods = self.mods + else: + mods = {name: self.mods[name] for name in brokers} + + if len(mods) == 1: + # just pass the brokerd stream directly if only one provider + # was detected. + stream = self.streams[list(brokers)[0]] + async with stream.subscribe() as bstream: + yield bstream + return + + # start multiplexing task tree + tx, rx = trio.open_memory_channel(616) + + async def relay_to_common_memchan(stream: tractor.MsgStream): + async with tx: + async for msg in stream: + await tx.send(msg) + + async with trio.open_nursery() as nurse: + # spawn a relay task for each stream so that they all + # multiplex to a common channel. + for brokername in mods: + stream = self.streams[brokername] + nurse.start_soon(relay_to_common_memchan, stream) + + try: + yield rx + finally: + nurse.cancel_scope.cancel() _max_sample_rate: int = 1 - @property - def portal(self) -> tractor.Portal: - return self._portal + # @property + # def portal(self) -> tractor.Portal: + # return self._portal - @property - def name(self) -> str: - return self.mod.name + # @property + # def name(self) -> str: + # return self.mod.name @acm @@ -1457,6 +1511,7 @@ async def open_feed( ''' providers: dict[ModuleType, list[str]] = {} + feed = Feed() for fqsn in fqsns: brokername, key, suffix = unpack_fqsn(fqsn) @@ -1469,6 +1524,7 @@ async def open_feed( # built a per-provider map to instrument names providers.setdefault(mod, []).append(bfqsn) + feed.mods[mod.name] = mod # one actor per brokerd for now brokerd_ctxs = [] @@ -1495,18 +1551,15 @@ async def open_feed( (brokermod, bfqsns), ) in zip(portals, providers.items()): - feed = Feed( - mod=brokermod, - _portal=portal, - status={}, - ) + feed.portals[brokermod] = portal + # fill out "status info" that the UI can show - host, port = feed.portal.channel.raddr + host, port = portal.channel.raddr if host == '127.0.0.1': host = 'localhost' feed.status.update({ - 'actor_name': feed.portal.channel.uid[0], + 'actor_name': portal.channel.uid[0], 'host': host, 'port': port, 'hist_shm': 'NA', @@ -1519,8 +1572,7 @@ async def open_feed( bus_ctxs.append( portal.open_context( open_feed_bus, - # brokername=brokermod.name, - brokername=brokername, + brokername=brokermod.name, symbols=bfqsns, loglevel=loglevel, start_stream=start_stream, @@ -1528,61 +1580,66 @@ async def open_feed( ) ) - async with ( - gather_contexts(bus_ctxs) as ctxs, - ): - remote_scopes = [] - for ( - (ctx, flumes_msg_dict), - (brokermod, bfqsns), - ) in zip(ctxs, providers.items()): + assert len(feed.mods) == len(feed.portals) - stream_ctxs = [] - for fqsn, flume_msg in flumes_msg_dict.items(): - flume = Flume.from_msg(flume_msg) - assert flume.symbol.fqsn == fqsn - feed.flumes[fqsn] = flume - flume.feed = feed + async with ( + gather_contexts(bus_ctxs) as ctxs, + ): + stream_ctxs = [] + for ( + (ctx, flumes_msg_dict), + (brokermod, bfqsns), + ) in zip(ctxs, providers.items()): - # attach and cache shm handles - rt_shm = flume.rt_shm - assert rt_shm - hist_shm = flume.hist_shm - assert hist_shm + for fqsn, flume_msg in flumes_msg_dict.items(): + flume = Flume.from_msg(flume_msg) + assert flume.symbol.fqsn == fqsn + feed.flumes[fqsn] = flume - feed.status['hist_shm'] = ( - f'{humanize(hist_shm._shm.size)}' - ) - feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}' + # TODO: do we need this? + flume.feed = feed - remote_scopes.append(ctx) - stream_ctxs.append( - ctx.open_stream( - # XXX: be explicit about stream backpressure - # since we should **never** overrun on feeds - # being too fast, which will pretty much - # always happen with HFT XD - backpressure=backpressure, - ) + # attach and cache shm handles + rt_shm = flume.rt_shm + assert rt_shm + hist_shm = flume.hist_shm + assert hist_shm + + feed.status['hist_shm'] = ( + f'{humanize(hist_shm._shm.size)}' ) + feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}' - async with ( - gather_contexts(stream_ctxs) as streams, - ): - for ( - stream, - (brokermod, bfqsns), - ) in zip(streams, providers.items()): + stream_ctxs.append( + ctx.open_stream( + # XXX: be explicit about stream backpressure + # since we should **never** overrun on feeds + # being too fast, which will pretty much + # always happen with HFT XD + backpressure=backpressure, + ) + ) - # for bfqsn in bfqsns: - for fqsn in flumes_msg_dict: + async with ( + gather_contexts(stream_ctxs) as streams, + ): + for ( + stream, + (brokermod, bfqsns), + ) in zip(streams, providers.items()): - # apply common rt steam to each flume - # (normally one per broker) - feed.flumes[fqsn].stream = stream - feed.streams[brokermod.name] = stream + feed.streams[brokermod.name] = stream - yield feed + # for bfqsn in bfqsns: + for fqsn in flumes_msg_dict: + + # apply common rt steam to each flume + # (normally one per broker) + feed.flumes[fqsn].stream = stream + + assert len(feed.mods) == len(feed.portals) == len(feed.streams) + + yield feed @acm