Ensure correct stream is set on each `Flume`

Set each quote-stream by matching the provider for each `Flume` and thus
results in some flumes mapping to the same (multiplexed) stream.
Monkey-patch the equivalent `tractor.MsgStream._ctx: tractor.Context` on
each broadcast-receiver subscription to allow use by feed bus methods as
well as other internals which need to reference IPC channel/portal info.

Start a `_FeedsBus` subscription management API:
- add `.get_subs()` which returns the list of tuples registered for the
  given key (normally the fqsn).
- add `.remove_sub()` which allows removing by key and tuple value and
  provides encapsulation for sampler task(s) which deal with dropped
  connections/subscribers.
agg_feedz
Tyler Goodlet 2022-11-14 14:33:04 -05:00
parent 1e96ca32df
commit f5cd63ad35
1 changed files with 43 additions and 6 deletions

View File

@ -147,6 +147,38 @@ class _FeedsBus(Struct):
# task: trio.lowlevel.Task,
# ) -> bool:
# ...
def get_subs(
self,
key: str,
) -> list[
tuple[
Union[tractor.MsgStream, trio.MemorySendChannel],
tractor.Context,
float | None, # tick throttle in Hz
]
]:
return self._subscribers[key]
def remove_sub(
self,
key: str,
sub: tuple,
) -> bool:
'''
Remove a consumer's subscription entry for the given key.
'''
stream, ctx, tick_throttle = sub
subs = self.get_subs(key)
try:
subs.remove(sub)
except ValueError:
chan = ctx.chan
log.error(
f'Stream was already removed from subs!?\n'
f'{key}:'
f'{ctx.cid}@{chan.uid}'
)
_bus: _FeedsBus = None
@ -916,6 +948,7 @@ class Flume(Struct):
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
assert portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
@ -1299,6 +1332,7 @@ async def open_feed_bus(
await stream.send({fqsn: flume.first_quote})
# set a common msg stream for all requested symbols
assert stream
flume.stream = stream
# Add a real-time quote subscription to feed bus:
@ -1539,6 +1573,9 @@ async def maybe_open_feed(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
# XXX: TODO: horrible hackery that needs fixing..
# i guess we have to create context proxies?
bstream._ctx = flume.stream._ctx
flume.stream = bstream
yield feed
@ -1679,14 +1716,14 @@ async def open_feed(
(brokermod, bfqsns),
) in zip(streams, providers.items()):
assert stream
feed.streams[brokermod.name] = stream
# for bfqsn in bfqsns:
for fqsn in flumes_msg_dict:
# apply common rt steam to each flume
# (normally one per broker)
feed.flumes[fqsn].stream = stream
# apply `brokerd`-common steam to each flume
# tracking a symbol from that provider.
for fqsn, flume in feed.flumes.items():
if brokermod.name in flume.symbol.brokers:
flume.stream = stream
assert len(feed.mods) == len(feed.portals) == len(feed.streams)