piker/piker/data/feed.py

1480 lines
44 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed apis and infra.
This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from dataclasses import (
dataclass,
field,
)
from datetime import datetime
from functools import partial
from types import ModuleType
from typing import (
Any,
AsyncIterator,
Callable,
Optional,
Awaitable,
TYPE_CHECKING,
Union,
)
import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from tractor.trionics import maybe_open_context
import pendulum
import numpy as np
from ..brokers import get_brokermod
from ..calc import humanize
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
check_for_service,
)
from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
_secs_in_day,
)
from .ingest import get_ingestormod
from .types import Struct
from ._source import (
base_iohlc_dtype,
Symbol,
unpack_fqsn,
)
from ..ui import _search
from ._sampling import (
sampler,
broadcast,
increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast,
uniform_rate_send,
_default_delay_s,
)
from ..brokers._util import (
DataUnavailable,
)
if TYPE_CHECKING:
from .marketstore import Storage
log = get_logger(__name__)
class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. A bus is
associated one-to-one with a particular broker backend where the
"bus" refers so a multi-symbol bus where quotes are interleaved in
time.
Each "entry" in the bus includes:
- a stream used to push real time quotes (up to tick rates)
which is executed as a lone task that is cancellable via
a dedicated cancel scope.
'''
brokername: str
nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
# XXX: so weird but, apparently without this being `._` private
# pydantic will complain about private `tractor.Context` instance
# vars (namely `._portal` and `._cancel_scope`) at import time.
# Reported this bug:
# https://github.com/samuelcolvin/pydantic/issues/2816
_subscribers: dict[
str,
list[
tuple[
Union[tractor.MsgStream, trio.MemorySendChannel],
tractor.Context,
Optional[float], # tick throttle in Hz
]
]
] = {}
async def start_task(
self,
target: Awaitable,
*args,
) -> None:
async def start_with_cs(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
with trio.CancelScope() as cs:
await self.nursery.start(
target,
*args,
)
task_status.started(cs)
return await self.nursery.start(start_with_cs)
# def cancel_task(
# self,
# task: trio.lowlevel.Task,
# ) -> bool:
# ...
_bus: _FeedsBus = None
def get_feed_bus(
brokername: str,
nursery: Optional[trio.Nursery] = None,
) -> _FeedsBus:
'''
Retreive broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock.
'''
global _bus
if nursery is not None:
assert _bus is None, "Feeds manager is already setup?"
# this is initial setup by parent actor
_bus = _FeedsBus(
brokername=brokername,
nursery=nursery,
)
assert not _bus.feeds
assert _bus.brokername == brokername, "Uhhh wtf"
return _bus
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
get_console_log(tractor.current_actor().loglevel)
global _bus
assert not _bus
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
def diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt: Optional[datetime] = None
) -> np.ndarray:
to_push = array
if last_tsdb_dt:
s_diff = (start_dt - last_tsdb_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if (
s_diff < 0
):
if abs(s_diff) < len(array):
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff) + 1:]
else:
# pass back only the portion of the array that is
# greater then the last time stamp in the tsdb.
time = array['time']
to_push = array[time >= last_tsdb_dt.timestamp()]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
async def start_backfill(
mod: ModuleType,
bfqsn: str,
shm: ShmArray,
timeframe: float,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = True,
tsdb_is_up: bool = False,
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
) -> int:
hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(bfqsn) as (hist, config):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(
timeframe,
end_dt=None,
)
times = array['time']
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1])
- pendulum.from_timestamp(times[-2])
).seconds
# frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * step_size_s
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push, prepend=True)
# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart..
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event()
# let caller unblock and deliver latest history frame
task_status.started((
start_dt,
end_dt,
bf_done,
))
# based on the sample step size, maybe load a certain amount history
if last_tsdb_dt is None:
if step_size_s not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# when no tsdb "last datum" is provided, we just load
# some near-term history.
periods = {
1: {'days': 1},
60: {'days': 14},
}
if tsdb_is_up:
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 6},
}
kwargs = periods[step_size_s]
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
# settings above when the tsdb is detected as being empty.
last_tsdb_dt = start_dt.subtract(**kwargs)
# configure async query throttling
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts.
starts: set[datetime] = set()
# inline sequential loop where we simply pass the
# last retrieved start dt to the next request as
# it's end dt.
while start_dt > last_tsdb_dt:
log.info(
f'Requesting {step_size_s}s frame ending in {start_dt}'
)
try:
array, next_start_dt, end_dt = await hist(
timeframe,
end_dt=start_dt,
)
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history!?'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
return
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if not-yet-seen
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp()
diff = end_dt - start_dt
frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s + step_size_s
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
log.warning(
f'History frame ending @ {end_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
)
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
ln = len(to_push)
if ln:
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
else:
log.warning(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
)
# bail gracefully on shm allocation overrun/full condition
try:
shm.push(to_push, prepend=True)
except ValueError:
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
)
break
log.info(
f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}'
)
if (
storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
timeframe,
)
# TODO: can we only trigger this if the respective
# history in "in view"?!?
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# short-circuit (for now)
bf_done.set()
async def basic_backfill(
bus: _FeedsBus,
mod: ModuleType,
bfqsn: str,
shms: dict[int, ShmArray],
) -> None:
# do a legacy incremental backfill from the provider.
log.info('No TSDB (marketstored) found, doing basic backfill..')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
for timeframe, shm in shms.items():
try:
await bus.nursery.start(
partial(
start_backfill,
mod,
bfqsn,
shm,
timeframe=timeframe,
)
)
except DataUnavailable:
# XXX: timeframe not supported for backend
continue
async def tsdb_backfill(
mod: ModuleType,
marketstore: ModuleType,
bus: _FeedsBus,
storage: Storage,
fqsn: str,
bfqsn: str,
shms: dict[int, ShmArray],
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {}
# start history anal and load missing new data via backend.
for timeframe, shm in shms.items():
tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
fqsn,
timeframe=timeframe,
)
broker, symbol, expiry = unpack_fqsn(fqsn)
try:
(
latest_start_dt,
latest_end_dt,
bf_done,
) = await bus.nursery.start(
partial(
start_backfill,
mod,
bfqsn,
shm,
timeframe=timeframe,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
)
)
except DataUnavailable:
# XXX: timeframe not supported for backend
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt,
None,
None,
None,
)
continue
# tsdb_history = series.get(timeframe)
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started((
shms[60],
shms[1],
))
async def back_load_from_tsdb(
timeframe: int,
shm: ShmArray,
):
(
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
) = dts_per_tf[timeframe]
# sync to backend history task's query/load completion
if bf_done:
await bf_done.wait()
# Load tsdb history into shm buffer (for display).
# TODO: eventually it'd be nice to not require a shm array/buffer
# to accomplish this.. maybe we can do some kind of tsdb direct to
# graphics format eventually in a child-actor?
# do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data.
if last_tsdb_dt and latest_start_dt:
dt_diff_s = (
latest_start_dt - last_tsdb_dt
).seconds
else:
dt_diff_s = 0
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
prepend_start = shm._first.value
# sanity check on most-recent-data loading
assert prepend_start > dt_diff_s
if (
len(tsdb_history)
):
to_push = tsdb_history[:prepend_start]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=marketstore.ohlc_key_map,
)
prepend_start = shm._first.value
# load as much from storage into shm as space will
# allow according to user's shm size settings.
last_frame_start = tsdb_history['Epoch'][0]
while (
shm._first.value > 0
):
tsdb_history = await storage.read_ohlcv(
fqsn,
end=last_frame_start,
timeframe=timeframe,
)
if (
not len(tsdb_history)
):
# on empty db history
break
time = tsdb_history['Epoch']
frame_start = time[0]
frame_end = time[0]
print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}")
if frame_start >= last_frame_start:
# no new data loaded was from tsdb, so we can exit.
break
prepend_start = shm._first.value
to_push = tsdb_history[:prepend_start]
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
shm.push(
to_push,
prepend=True,
field_map=marketstore.ohlc_key_map,
)
last_frame_start = frame_start
log.info(f'Loaded {to_push.shape} datums from storage')
# manually trigger step update to update charts/fsps
# which need an incremental update.
# NOTE: the way this works is super duper
# un-intuitive right now:
# - the broadcaster fires a msg to the fsp subsystem.
# - fsp subsys then checks for a sample step diff and
# possibly recomputes prepended history.
# - the fsp then sends back to the parent actor
# (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full
# graphics loop cycle.
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# TODO: write new data to tsdb to be ready to for next read.
# backload from db (concurrently per timeframe) once backfilling of
# recent dat a loaded from the backend provider (see
# ``bf_done.wait()`` call).
async with trio.open_nursery() as nurse:
for timeframe, shm in shms.items():
nurse.start_soon(
back_load_from_tsdb,
timeframe,
shm,
)
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
fqsn: str,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Load and manage historical data including the loading of any
available series from `marketstore` as well as conducting real-time
update of both that existing db and the allocated shared memory
buffer.
'''
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_hist',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
hist_zero_index = hist_shm.index - 1
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
size=3*_secs_in_day,
)
# (for now) set the rt (hft) shm array with space to prepend
# only a few days worth of 1s history.
days = 2
start_index = days*_secs_in_day
rt_shm._first.value = start_index
rt_shm._last.value = start_index
rt_zero_index = rt_shm.index - 1
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored')
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
assert open_history_client
if (
tsdb_is_up
and opened
and open_history_client
):
log.info('Found existing `marketstored`')
from . import marketstore
async with (
marketstore.open_storage_client(fqsn)as storage,
):
hist_shm, rt_shm = await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,
bus,
storage,
fqsn,
bfqsn,
{
1: rt_shm,
60: hist_shm,
},
)
# yield back after client connect with filled shm
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever()
# load less history if no tsdb can be found
elif (
not tsdb_is_up
and opened
):
await basic_backfill(
bus,
mod,
bfqsn,
shms={
1: rt_shm,
60: hist_shm,
},
)
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
some_data_ready.set()
await trio.sleep_forever()
async def allocate_persistent_feed(
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
start_stream: bool = True,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Create and maintain a "feed bus" which allocates tasks for real-time
streaming and optional historical data storage per broker/data provider
backend; this normally task runs *in* a `brokerd` actor.
If none exists, this allocates a ``_FeedsBus`` which manages the
lifetimes of streaming tasks created for each requested symbol.
2 tasks are created:
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# mem chan handed to broker backend so it can push real-time
# quotes to this task for sampling and history storage (see below).
send, quote_stream = trio.open_memory_channel(616)
# data sync signals for both history loading and market quotes
some_data_ready = trio.Event()
feed_is_live = trio.Event()
# establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quote = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
feed_is_live=feed_is_live,
symbols=[symbol],
loglevel=loglevel,
)
)
# the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use.
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks:
# - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
(
izero_hist,
hist_shm,
izero_rt,
rt_shm,
) = await bus.nursery.start(
manage_history,
mod,
bus,
'.'.join((bfqsn, brokername)),
some_data_ready,
feed_is_live,
)
# we hand an IPC-msg compatible shm token to the caller so it
# can read directly from the memory which will be written by
# this task.
msg = init_msg[symbol]
msg['hist_shm_token'] = hist_shm.token
msg['izero_hist'] = izero_hist
msg['izero_rt'] = izero_rt
msg['rt_shm_token'] = rt_shm.token
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
# for most purposes.
# pass OHLC sample rate in seconds (be sure to use python int type)
# init_msg[symbol]['sample_rate'] = 1 #int(delay_s)
# yield back control to starting nursery once we receive either
# some history or a real-time quote.
log.info(f'waiting on history to load: {fqsn}')
await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = {
acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote,
}
# for ambiguous names we simply apply the retreived
# feed to that name (for now).
bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg,
generic_first_quotes,
)
# insert 1s ohlc into the increment buffer set
# to update and shift every second
sampler.ohlcv_shms.setdefault(
1,
[]
).append(rt_shm)
task_status.started()
if not start_stream:
await trio.sleep_forever()
# begin real-time updates of shm and tsb once the feed goes live and
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# insert 1m ohlc into the increment buffer set
# to shift every 60s.
sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
# create buffer a single incrementer task broker backend
# (aka `brokerd`) using the lowest sampler period.
if sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
_default_delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
# NOTE: if no high-freq sampled data has (yet) been loaded,
# seed the buffer with a history datum - this is most handy
# for many backends which don't sample @ 1s OHLC but do have
# slower data such as 1m OHLC.
if not len(rt_shm.array):
rt_shm.push(hist_shm.array[-3:-1])
ohlckeys = ['open', 'high', 'low', 'close']
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
rt_shm.array['volume'][-2] = 0
# start sample loop and shm incrementer task for OHLC style sampling
# at the above registered step periods.
try:
await sample_and_broadcast(
bus,
rt_shm,
hist_shm,
quote_stream,
brokername,
sum_tick_vlm
)
finally:
log.warning(f'{fqsn} feed task terminated')
@tractor.context
async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
) -> None:
'''
Open a data feed "bus": an actor-persistent per-broker task-oriented
data feed registry which allows managing real-time quote streams per
symbol.
'''
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# local state sanity checks
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername)
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
allocate_persistent_feed,
bus=bus,
brokername=brokername,
# here we pass through the selected symbol in native
# "format" (i.e. upper vs. lowercase depending on
# provider).
symbol=symbol,
loglevel=loglevel,
start_stream=start_stream,
)
)
# TODO: we can remove this?
assert isinstance(bus.feeds[symbol], tuple)
# XXX: ``first_quotes`` may be outdated here if this is secondary
# subscriber
init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol]
bfqsn = msg['fqsn'].lower()
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
assert bsym in first_quotes
# we use the broker-specific fqsn (bfqsn) for
# the sampler subscription since the backend isn't (yet)
# expected to append it's own name to the fqsn, so we filter
# on keys which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bfqsn, [])
# send this even to subscribers to existing feed?
# deliver initial info message a first quote asap
await ctx.started((
init_msg,
first_quotes,
))
if not start_stream:
log.warning(f'Not opening real-time stream for {fqsn}')
await trio.sleep_forever()
# real-time stream loop
async with (
ctx.open_stream() as stream,
):
# re-send to trigger display loop cycle (necessary especially
# when the mkt is closed and no real-time messages are
# expected).
await stream.send({fqsn: first_quotes})
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
if tick_throttle:
send, recv = trio.open_memory_channel(2**10)
cs = await bus.start_task(
uniform_rate_send,
tick_throttle,
recv,
stream,
)
sub = (send, ctx, tick_throttle)
else:
sub = (stream, ctx, tick_throttle)
subs = bus._subscribers[bfqsn]
subs.append(sub)
try:
uid = ctx.chan.uid
# ctrl protocol for start/stop of quote streams based on UI
# state (eg. don't need a stream when a symbol isn't being
# displayed).
async for msg in stream:
if msg == 'pause':
if sub in subs:
log.info(
f'Pausing {fqsn} feed for {uid}')
subs.remove(sub)
elif msg == 'resume':
if sub not in subs:
log.info(
f'Resuming {fqsn} feed for {uid}')
subs.append(sub)
else:
raise ValueError(msg)
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
# TODO: a one-cancels-one nursery
# n.cancel_scope.cancel()
cs.cancel()
try:
bus._subscribers[bfqsn].remove(sub)
except ValueError:
log.warning(f'{sub} for {symbol} was already removed?')
@dataclass
class Feed:
'''
A data feed for client-side interaction with far-process real-time
data sources.
This is an thin abstraction on top of ``tractor``'s portals for
interacting with IPC streams and storage APIs (shm and time-series
db).
'''
name: str
hist_shm: ShmArray
rt_shm: ShmArray
mod: ModuleType
first_quotes: dict # symbol names to first quote dicts
_portal: tractor.Portal
stream: trio.abc.ReceiveChannel[dict[str, Any]]
status: dict[str, Any]
izero_hist: int = 0
izero_rt: int = 0
throttle_rate: Optional[int] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 1
# cache of symbol info messages received as first message when
# a stream startsc.
symbols: dict[str, Symbol] = field(default_factory=dict)
@property
def portal(self) -> tractor.Portal:
return self._portal
async def receive(self) -> dict:
return await self.stream.receive()
@acm
async def index_stream(
self,
delay_s: int = 1,
) -> AsyncIterator[int]:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
self.portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
async def pause(self) -> None:
await self.stream.send('pause')
async def resume(self) -> None:
await self.stream.send('resume')
def get_ds_info(
self,
) -> tuple[float, float, float]:
'''
Compute the "downsampling" ratio info between the historical shm
buffer and the real-time (HFT) one.
Return a tuple of the fast sample period, historical sample
period and ratio between them.
'''
times = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds
times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
rt_step_size_s = (end - start).seconds
ratio = hist_step_size_s / rt_step_size_s
return (
rt_step_size_s,
hist_step_size_s,
ratio,
)
@acm
async def install_brokerd_search(
portal: tractor.Portal,
brokermod: ModuleType,
) -> None:
async with portal.open_context(
brokermod.open_symbol_search
) as (ctx, cache):
# shield here since we expect the search rpc to be
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> dict[str, Any]:
await stream.send(text)
try:
return await stream.receive()
except trio.EndOfChannel:
return {}
async with _search.register_symbol_search(
provider_name=brokermod.name,
search_routine=search,
# TODO: should be make this a required attr of
# a backend module?
pause_period=getattr(
brokermod, '_search_conf', {}
).get('pause_period', 0.0616),
):
yield
@acm
async def open_feed(
fqsns: list[str],
loglevel: Optional[str] = None,
backpressure: bool = True,
start_stream: bool = True,
tick_throttle: Optional[float] = None, # Hz
) -> Feed:
'''
Open a "data feed" which provides streamed real-time quotes.
'''
fqsn = fqsns[0].lower()
brokername, key, suffix = unpack_fqsn(fqsn)
bfqsn = fqsn.replace('.' + brokername, '')
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# no feed for broker exists so maybe spawn a data brokerd
async with (
# if no `brokerd` for this backend exists yet we spawn
# and actor for one.
maybe_spawn_brokerd(
brokername,
loglevel=loglevel
) as portal,
# (allocate and) connect to any feed bus for this broker
portal.open_context(
open_feed_bus,
brokername=brokername,
symbol=bfqsn,
loglevel=loglevel,
start_stream=start_stream,
tick_throttle=tick_throttle,
) as (ctx, (init_msg, first_quotes)),
ctx.open_stream(
# XXX: be explicit about stream backpressure since we should
# **never** overrun on feeds being too fast, which will
# pretty much always happen with HFT XD
backpressure=backpressure,
) as stream,
):
init = init_msg[bfqsn]
# we can only read from shm
hist_shm = attach_shm_array(
token=init['hist_shm_token'],
readonly=True,
)
rt_shm = attach_shm_array(
token=init['rt_shm_token'],
readonly=True,
)
assert fqsn in first_quotes
feed = Feed(
name=brokername,
hist_shm=hist_shm,
rt_shm=rt_shm,
mod=mod,
first_quotes=first_quotes,
stream=stream,
_portal=portal,
status={},
izero_hist=init['izero_hist'],
izero_rt=init['izero_rt'],
throttle_rate=tick_throttle,
)
# fill out "status info" that the UI can show
host, port = feed.portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
feed.status.update({
'actor_name': feed.portal.channel.uid[0],
'host': host,
'port': port,
'shm': f'{humanize(feed.hist_shm._shm.size)}',
'throttle_rate': feed.throttle_rate,
})
feed.status.update(init_msg.pop('status', {}))
for sym, data in init_msg.items():
si = data['symbol_info']
fqsn = data['fqsn'] + f'.{brokername}'
symbol = Symbol.from_fqsn(
fqsn,
info=si,
)
# symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
for shm_key, shm in [
('rt_shm_token', rt_shm),
('hist_shm_token', hist_shm),
]:
shm_token = data[shm_key]
# XXX: msgspec won't relay through the tuples XD
shm_token['dtype_descr'] = tuple(
map(tuple, shm_token['dtype_descr']))
assert shm_token == shm.token # sanity
feed._max_sample_rate = 1
try:
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()
@acm
async def maybe_open_feed(
fqsns: list[str],
loglevel: Optional[str] = None,
**kwargs,
) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
fqsn = fqsns[0]
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'fqsns': fqsns,
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,
) as (cache_hit, feed):
if cache_hit:
log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
yield feed, bstream
else:
yield feed, feed.stream