Multi-`broker` quotes with `Feed.open_multi_stream()`

Adds provider-list-filtered (quote) stream multiplexing support allowing
for merged real-time `tractor.MsgStream`s using an `@acm` interface.
Behind the scenes we are just doing a classic multi-task push to common
mem chan approach.

Details to make it work on `Feed`:
- add `Feed.mods: dict[str, Moduletype]` and
  `Feed.portals[ModuleType, tractor.Portal]` which are both populated
  during init in `open_feed()`
- drop `Feed.portal` and `Feed.name`

Also fix a final lingering tsdb history loading loop termination bug.
agg_feedz
Tyler Goodlet 2022-11-11 17:17:17 -05:00
parent 20a396270e
commit 7b9db86753
1 changed files with 126 additions and 69 deletions

View File

@ -32,6 +32,7 @@ from typing import (
Callable,
Optional,
Awaitable,
Sequence,
TYPE_CHECKING,
Union,
)
@ -627,26 +628,31 @@ async def tsdb_backfill(
field_map=marketstore.ohlc_key_map,
)
tsdb_last_frame_start = tsdb_history['Epoch'][0]
# load as much from storage into shm possible (depends on
# user's shm size settings).
while (
shm._first.value > 0
):
# load as much from storage into shm as space will
# allow according to user's shm size settings.
tsdb_last_frame_start = tsdb_history['Epoch'][0]
tsdb_history = await storage.read_ohlcv(
fqsn,
end=tsdb_last_frame_start,
timeframe=timeframe,
)
next_start = tsdb_history['Epoch'][0]
if (
not len(tsdb_history) # empty query
# no earlier data detected
or tsdb_history['Epoch'][0] >= tsdb_last_frame_start
or next_start >= tsdb_last_frame_start
):
break
else:
tsdb_last_frame_start = next_start
prepend_start = shm._first.value
to_push = tsdb_history[-prepend_start:]
@ -868,6 +874,9 @@ class Flume(Struct):
izero_hist: int = 0
izero_rt: int = 0
throttle_rate: int | None = None
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
@property
@ -905,12 +914,15 @@ class Flume(Struct):
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
self.feed.portal.open_context,
portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
@ -1384,24 +1396,66 @@ class Feed(Struct):
similarly allocated shm arrays.
'''
mod: ModuleType
_portal: tractor.Portal
mods: dict[str, ModuleType] = {}
portals: dict[ModuleType, tractor.Portal] = {}
flumes: dict[str, Flume] = {}
streams: dict[
str,
trio.abc.ReceiveChannel[dict[str, Any]],
] = {}
status: dict[str, Any]
# used for UI to show remote state
status: dict[str, Any] = {}
@acm
async def open_multi_stream(
self,
brokers: Sequence[str] | None = None,
) -> trio.abc.ReceiveChannel:
if brokers is None:
mods = self.mods
else:
mods = {name: self.mods[name] for name in brokers}
if len(mods) == 1:
# just pass the brokerd stream directly if only one provider
# was detected.
stream = self.streams[list(brokers)[0]]
async with stream.subscribe() as bstream:
yield bstream
return
# start multiplexing task tree
tx, rx = trio.open_memory_channel(616)
async def relay_to_common_memchan(stream: tractor.MsgStream):
async with tx:
async for msg in stream:
await tx.send(msg)
async with trio.open_nursery() as nurse:
# spawn a relay task for each stream so that they all
# multiplex to a common channel.
for brokername in mods:
stream = self.streams[brokername]
nurse.start_soon(relay_to_common_memchan, stream)
try:
yield rx
finally:
nurse.cancel_scope.cancel()
_max_sample_rate: int = 1
@property
def portal(self) -> tractor.Portal:
return self._portal
# @property
# def portal(self) -> tractor.Portal:
# return self._portal
@property
def name(self) -> str:
return self.mod.name
# @property
# def name(self) -> str:
# return self.mod.name
@acm
@ -1457,6 +1511,7 @@ async def open_feed(
'''
providers: dict[ModuleType, list[str]] = {}
feed = Feed()
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
@ -1469,6 +1524,7 @@ async def open_feed(
# built a per-provider map to instrument names
providers.setdefault(mod, []).append(bfqsn)
feed.mods[mod.name] = mod
# one actor per brokerd for now
brokerd_ctxs = []
@ -1495,18 +1551,15 @@ async def open_feed(
(brokermod, bfqsns),
) in zip(portals, providers.items()):
feed = Feed(
mod=brokermod,
_portal=portal,
status={},
)
feed.portals[brokermod] = portal
# fill out "status info" that the UI can show
host, port = feed.portal.channel.raddr
host, port = portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
feed.status.update({
'actor_name': feed.portal.channel.uid[0],
'actor_name': portal.channel.uid[0],
'host': host,
'port': port,
'hist_shm': 'NA',
@ -1519,8 +1572,7 @@ async def open_feed(
bus_ctxs.append(
portal.open_context(
open_feed_bus,
# brokername=brokermod.name,
brokername=brokername,
brokername=brokermod.name,
symbols=bfqsns,
loglevel=loglevel,
start_stream=start_stream,
@ -1528,61 +1580,66 @@ async def open_feed(
)
)
async with (
gather_contexts(bus_ctxs) as ctxs,
):
remote_scopes = []
for (
(ctx, flumes_msg_dict),
(brokermod, bfqsns),
) in zip(ctxs, providers.items()):
assert len(feed.mods) == len(feed.portals)
stream_ctxs = []
for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn
feed.flumes[fqsn] = flume
flume.feed = feed
async with (
gather_contexts(bus_ctxs) as ctxs,
):
stream_ctxs = []
for (
(ctx, flumes_msg_dict),
(brokermod, bfqsns),
) in zip(ctxs, providers.items()):
# attach and cache shm handles
rt_shm = flume.rt_shm
assert rt_shm
hist_shm = flume.hist_shm
assert hist_shm
for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn
feed.flumes[fqsn] = flume
feed.status['hist_shm'] = (
f'{humanize(hist_shm._shm.size)}'
)
feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}'
# TODO: do we need this?
flume.feed = feed
remote_scopes.append(ctx)
stream_ctxs.append(
ctx.open_stream(
# XXX: be explicit about stream backpressure
# since we should **never** overrun on feeds
# being too fast, which will pretty much
# always happen with HFT XD
backpressure=backpressure,
)
# attach and cache shm handles
rt_shm = flume.rt_shm
assert rt_shm
hist_shm = flume.hist_shm
assert hist_shm
feed.status['hist_shm'] = (
f'{humanize(hist_shm._shm.size)}'
)
feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}'
async with (
gather_contexts(stream_ctxs) as streams,
):
for (
stream,
(brokermod, bfqsns),
) in zip(streams, providers.items()):
stream_ctxs.append(
ctx.open_stream(
# XXX: be explicit about stream backpressure
# since we should **never** overrun on feeds
# being too fast, which will pretty much
# always happen with HFT XD
backpressure=backpressure,
)
)
# for bfqsn in bfqsns:
for fqsn in flumes_msg_dict:
async with (
gather_contexts(stream_ctxs) as streams,
):
for (
stream,
(brokermod, bfqsns),
) in zip(streams, providers.items()):
# apply common rt steam to each flume
# (normally one per broker)
feed.flumes[fqsn].stream = stream
feed.streams[brokermod.name] = stream
feed.streams[brokermod.name] = stream
yield feed
# for bfqsn in bfqsns:
for fqsn in flumes_msg_dict:
# apply common rt steam to each flume
# (normally one per broker)
feed.flumes[fqsn].stream = stream
assert len(feed.mods) == len(feed.portals) == len(feed.streams)
yield feed
@acm