diff --git a/piker/data/feed.py b/piker/data/feed.py index 4dc63da6..b243be8d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -147,6 +147,38 @@ class _FeedsBus(Struct): # task: trio.lowlevel.Task, # ) -> bool: # ... + def get_subs( + self, + key: str, + ) -> list[ + tuple[ + Union[tractor.MsgStream, trio.MemorySendChannel], + tractor.Context, + float | None, # tick throttle in Hz + ] + ]: + return self._subscribers[key] + + def remove_sub( + self, + key: str, + sub: tuple, + ) -> bool: + ''' + Remove a consumer's subscription entry for the given key. + + ''' + stream, ctx, tick_throttle = sub + subs = self.get_subs(key) + try: + subs.remove(sub) + except ValueError: + chan = ctx.chan + log.error( + f'Stream was already removed from subs!?\n' + f'{key}:' + f'{ctx.cid}@{chan.uid}' + ) _bus: _FeedsBus = None @@ -916,6 +948,7 @@ class Flume(Struct): # TODO: maybe a public (property) API for this in ``tractor``? portal = self.stream._ctx._portal + assert portal # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be @@ -1299,6 +1332,7 @@ async def open_feed_bus( await stream.send({fqsn: flume.first_quote}) # set a common msg stream for all requested symbols + assert stream flume.stream = stream # Add a real-time quote subscription to feed bus: @@ -1539,6 +1573,9 @@ async def maybe_open_feed( mngrs=[stream.subscribe() for stream in feed.streams.values()] ) as bstreams: for bstream, flume in zip(bstreams, feed.flumes.values()): + # XXX: TODO: horrible hackery that needs fixing.. + # i guess we have to create context proxies? + bstream._ctx = flume.stream._ctx flume.stream = bstream yield feed @@ -1679,14 +1716,14 @@ async def open_feed( (brokermod, bfqsns), ) in zip(streams, providers.items()): + assert stream feed.streams[brokermod.name] = stream - # for bfqsn in bfqsns: - for fqsn in flumes_msg_dict: - - # apply common rt steam to each flume - # (normally one per broker) - feed.flumes[fqsn].stream = stream + # apply `brokerd`-common steam to each flume + # tracking a symbol from that provider. + for fqsn, flume in feed.flumes.items(): + if brokermod.name in flume.symbol.brokers: + flume.stream = stream assert len(feed.mods) == len(feed.portals) == len(feed.streams)