Rework data feed API to allow for caching streams

Move all feed/stream agnostic logic and shared mem writing into a new
set of routines inside the ``data`` sub-package. This lets us move
toward a more standard API for broker and data backends to provide
cache-able persistent streams to client apps.

The data layer now takes care of
- starting a single background brokerd task to start a stream for as
  symbol if none yet exists and register that stream for later lookups
- the existing broker backend actor is now always re-used if possible
  if it can be found in a service tree
- synchronization with the brokerd stream's startup sequence is now
  oriented around fast startup concurrency such that client code gets
  a handle to historical data and quote schema as fast as possible
- historical data loading is delegated to the backend more formally by
  starting a ``backfill_bars()`` task
- write shared mem in the brokerd task and only destruct it once requested
  either from the parent actor or further clients
- fully de-duplicate stream data by using a dynamic pub-sub strategy
  where new clients register for copies of the same quote set per symbol

This new API is entirely working with the IB backend; others will need
to be ported. That's to come shortly.
cached_feeds
Tyler Goodlet 2021-03-29 08:22:27 -04:00
parent f17a26c948
commit a82f43e3a5
2 changed files with 344 additions and 335 deletions

View File

@ -34,6 +34,7 @@ import logging
import time import time
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError from ib_insync.wrapper import RequestError
@ -46,14 +47,9 @@ from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ( from ..data import maybe_spawn_brokerd
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
subscribe_ohlc_for_increment,
_buffer,
)
from ..data._source import from_df from ..data._source import from_df
from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound from ._util import SymbolNotFound
@ -781,36 +777,17 @@ def normalize(
return data return data
_local_buffer_writers = {} # _local_buffer_writers = {}
@asynccontextmanager async def backfill_bars(
async def activate_writer(key: str) -> (bool, trio.Nursery):
"""Mark the current actor with module var determining
whether an existing shm writer task is already active.
This avoids more then one writer resulting in data
clobbering.
"""
global _local_buffer_writers
try:
assert not _local_buffer_writers.get(key, False)
_local_buffer_writers[key] = True
async with trio.open_nursery() as n:
yield n
finally:
_local_buffer_writers.pop(key, None)
async def fill_bars(
sym: str, sym: str,
first_bars: list, # first_bars: list,
shm: 'ShmArray', # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer # count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -818,6 +795,19 @@ async def fill_bars(
https://github.com/pikers/piker/issues/128 https://github.com/pikers/piker/issues/128
""" """
first_bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
# write historical data to buffer
shm.push(bars_array)
# shm_token = shm.token
with trio.CancelScope() as cs:
task_status.started(cs)
next_dt = first_bars[0].date next_dt = first_bars[0].date
i = 0 i = 0
@ -830,6 +820,9 @@ async def fill_bars(
end_dt=next_dt, end_dt=next_dt,
) )
if bars_array is None:
raise SymbolNotFound(sym)
shm.push(bars_array, prepend=True) shm.push(bars_array, prepend=True)
i += 1 i += 1
next_dt = bars[0].date next_dt = bars[0].date
@ -847,7 +840,9 @@ async def fill_bars(
else: else:
log.exception( log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS") "Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
# TODO: should probably create some alert on screen # TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here # and then somehow get that to trigger an event here
@ -902,6 +897,7 @@ async def _setup_quote_stream(
# log.debug(t) # log.debug(t)
try: try:
to_trio.send_nowait(t) to_trio.send_nowait(t)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic # XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions # reason will catch and ignore regular exceptions
@ -946,24 +942,22 @@ async def start_aio_quote_stream(
return from_aio return from_aio
@tractor.stream
async def stream_quotes( async def stream_quotes(
ctx: tractor.Context, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: List[str],
shm_token: Tuple[str, str, List[tuple]], shm: ShmArray,
feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub # startup sync
topics: Any = None, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> None:
"""Stream symbol quotes. """Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked This is a ``trio`` callable routine meant to be invoked
once the brokerd is up. once the brokerd is up.
""" """
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
@ -975,54 +969,6 @@ async def stream_quotes(
stream = await start_aio_quote_stream(symbol=sym, contract=contract) stream = await start_aio_quote_stream(symbol=sym, contract=contract)
shm = None
async with trio.open_nursery() as ln:
# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
key = shm_token['shm_name']
writer_already_exists = _local_buffer_writers.get(key, False)
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
if not writer_already_exists:
_local_buffer_writers[key] = True
shm = attach_shm_array(
token=shm_token,
# we are the buffer writer
readonly=False,
)
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym)
# write historical data to buffer
shm.push(bars_array)
shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, sym, bars, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
@ -1044,12 +990,9 @@ async def stream_quotes(
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
sym: { sym: {
'is_shm_writer': not writer_already_exists,
'shm_token': shm_token,
'symbol_info': syminfo, 'symbol_info': syminfo,
} }
} }
await ctx.send_yield(init_msgs)
# check for special contract types # check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
@ -1061,7 +1004,6 @@ async def stream_quotes(
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = 'secType' suffix = 'secType'
calc_price = True calc_price = True
# ticker = first_ticker
# pass first quote asap # pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price) quote = normalize(first_ticker, calc_price=calc_price)
@ -1071,23 +1013,19 @@ async def stream_quotes(
first_quote = {topic: quote} first_quote = {topic: quote}
# yield first quote asap
await ctx.send_yield(first_quote)
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
task_status.started((init_msgs, first_quote))
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' suffix = 'exchange'
calc_price = False # should be real volume for contract calc_price = False # should be real volume for contract
# with trio.move_on_after(10) as cs:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async with aclosing(stream): async with aclosing(stream):
async for ticker in stream: async for ticker in stream:
@ -1104,46 +1042,13 @@ async def stream_quotes(
# (ahem, ib_insync is truly stateful trash) # (ahem, ib_insync is truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# tell incrementer task it can start
_buffer.shm_incrementing(key).set()
# XXX: this works because we don't use # XXX: this works because we don't use
# ``aclosing()`` above? # ``aclosing()`` above?
break break
# enter stream loop # tell caller quotes are now coming in live
try: feed_is_live.set()
async with stream:
await stream_and_write(
stream=stream,
calc_price=calc_price,
topic=topic,
write_shm=not writer_already_exists,
shm=shm,
suffix=suffix,
ctx=ctx,
)
finally:
if not writer_already_exists:
_local_buffer_writers[key] = False
stream.close()
async def stream_and_write(
stream,
calc_price: bool,
topic: str,
write_shm: bool,
suffix: str,
ctx: tractor.Context,
shm: Optional['SharedArray'], # noqa
) -> None:
"""Core quote streaming and shm writing loop; optimize for speed!
"""
# real-time stream
async with stream:
async for ticker in stream: async for ticker in stream:
# print(ticker.vwap) # print(ticker.vwap)
@ -1151,54 +1056,12 @@ async def stream_and_write(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
) )
quote['symbol'] = topic
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if write_shm:
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
v + new_v,
)
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic quote['symbol'] = topic
await ctx.send_yield({topic: quote}) await send_chan.send({topic: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []

View File

@ -23,23 +23,23 @@ sharing your feeds with other fellow pikers.
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial
from importlib import import_module from importlib import import_module
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Dict, Any, Sequence, Dict, Any, Sequence,
AsyncIterator, Optional, AsyncIterator, Optional,
Callable, Awaitable List
) )
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import ( from .._daemon import (
spawn_brokerd,
maybe_open_pikerd,
maybe_spawn_brokerd, maybe_spawn_brokerd,
) )
from ._normalize import iterticks from ._normalize import iterticks
@ -53,7 +53,8 @@ from ._sharedmem import (
from ._source import base_iohlc_dtype, Symbol from ._source import base_iohlc_dtype, Symbol
from ._buffer import ( from ._buffer import (
increment_ohlc_buffer, increment_ohlc_buffer,
subscribe_ohlc_for_increment subscribe_ohlc_for_increment,
shm_incrementing,
) )
__all__ = [ __all__ = [
@ -82,9 +83,8 @@ def get_ingestormod(name: str) -> ModuleType:
return module return module
# @dataclass class _FeedsBus(BaseModel):
class _FeedsCache(BaseModel): """Data feeds broadcaster and persistence management.
"""Data feeds manager.
This is a brokerd side api used to manager persistent real-time This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. streams that can be allocated and left alive indefinitely.
@ -92,82 +92,248 @@ class _FeedsCache(BaseModel):
""" """
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
tasks: Dict[str, trio.CancelScope] = {} feeds: Dict[str, trio.CancelScope] = {}
subscribers: Dict[str, List[tractor.Context]] = {}
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
# tasks: Dict[str, trio.CancelScope] = field(default_factory=dict)
async def start_feed(
symbol: str,
func: Callable[[int], Awaitable[None]],
) -> None:
"""Start a bg feed task and register a surrouding cancel scope
for it.
"""
with trio.CancelCscope() as cs:
pass
async def cancel_all(self) -> None: async def cancel_all(self) -> None:
for name, cs in self.tasks.item(): for sym, (cs, msg, quote) in self.feeds.items():
log.debug(f'Cancelling cached feed for {name}') log.debug(f'Cancelling cached feed for {self.brokername}:{sym}')
cs.cancel() cs.cancel()
_feeds: _FeedsCache = None _bus: _FeedsBus = None
def get_feeds_manager( def get_feed_bus(
brokername: str, brokername: str,
nursery: Optional[trio.Nursery] = None, nursery: Optional[trio.Nursery] = None,
) -> _FeedsCache: ) -> _FeedsBus:
""" """
Retreive data feeds manager from process global scope. Retreive broker-daemon-local data feeds bus from process global
scope.
""" """
global _feeds global _bus
if nursery is not None: if nursery is not None:
assert _feeds is None, "Feeds manager is already setup?" assert _bus is None, "Feeds manager is already setup?"
# this is initial setup by parent actor # this is initial setup by parent actor
_feeds = _FeedsCache( _bus = _FeedsBus(
brokername=brokername, brokername=brokername,
nursery=nursery, nursery=nursery,
) )
assert not _feeds.tasks assert not _bus.feeds
assert _feeds.brokername == brokername, "Uhhh wtf" assert _bus.brokername == brokername, "Uhhh wtf"
return _feeds return _bus
async def _setup_persistent_feeds(brokername: str) -> None: async def _setup_persistent_brokerd(brokername: str) -> None:
"""Allocate a actor-wide service nursery in ``brokerd`` """Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by such that feeds can be run in the background persistently by
the broker backend as needed. the broker backend as needed.
""" """
try:
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
_feeds = get_feeds_manager(brokername, service_nursery)
# assign a nursery to the feeds bus for spawning
# background tasks from clients
bus = get_feed_bus(brokername, service_nursery)
# we pin this task to keep the feeds manager active until the # we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down # parent actor decides to tear it down
await trio.sleep_forever() await trio.sleep_forever()
finally:
await bus.cancel_all()
async def allocate_persistent_feed(
ctx: tractor.Context,
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# allocate shm array for this broker/symbol
# XXX: we should get an error here if one already exists
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, symbol),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
# assert opened
if not opened:
# do history validation?
pass
send, quote_stream = trio.open_memory_channel(2**8)
feed_is_live = trio.Event()
# establish broker backend quote stream
# ``stream_quotes()`` is a required backend func
init_msg, first_quote = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
feed_is_live=feed_is_live,
symbols=[symbol],
shm=shm,
loglevel=loglevel,
)
)
init_msg[symbol]['shm_token'] = shm.token
cs = trio.CancelScope()
bus.feeds[symbol] = (cs, init_msg, first_quote)
with cs:
if opened:
# start history backfill task
# ``backfill_bars()`` is a required backend func
await bus.nursery.start(mod.backfill_bars, symbol, shm)
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
await feed_is_live.wait()
# tell incrementer task it can start
shm_incrementing(shm.token['shm_name']).set()
# start shm incrementingn for OHLC sampling
subscribe_ohlc_for_increment(shm, delay_s)
# begin shm write loop and broadcast to subscribers
sum_tick_vlm: bool = True
async with quote_stream:
log.info("Started shared mem bar writer")
# iterate stream delivered by broker
async for quotes in quote_stream:
for sym, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# start writing the shm buffer with appropriate trade data
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
if sum_tick_vlm:
volume = v + new_v
else:
volume = v
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
volume,
)
for ctx in bus.subscribers[sym]:
await ctx.send_yield({sym: quote})
@tractor.stream @tractor.stream
async def allocate_cached_feed( async def attach_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
symbol: str brokername: str,
symbol: str,
loglevel: str,
): ):
_feeds = get_feeds_manager(brokername, service_nursery)
# setup shared mem buffer if loglevel is None:
pass loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername)
task_cs = bus.feeds.get(symbol)
bus.subscribers.setdefault(symbol, []).append(ctx)
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
if task_cs is None:
init_msg, first_quote = await bus.nursery.start(
partial(
allocate_persistent_feed,
ctx=ctx,
bus=bus,
brokername=brokername,
symbol=symbol,
loglevel=loglevel,
)
)
# XXX: ``first_quote`` may be outdated here if this is secondary subscriber
cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed?
await ctx.send_yield(init_msg)
await ctx.send_yield(first_quote)
try:
# just block while the stream pumps
await trio.sleep_forever()
finally:
bus.subscribers[symbol].remove(ctx)
@dataclass @dataclass
@ -183,7 +349,7 @@ class Feed:
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
# ticks: ShmArray
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None _index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -262,40 +428,30 @@ async def open_feed(
# TODO: do all! # TODO: do all!
sym = symbols[0] sym = symbols[0]
# Attempt to allocate (or attach to) shm array for this broker/symbol
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, sym),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=True,
)
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
brokername, brokername,
loglevel=loglevel, loglevel=loglevel,
# TODO: add a cli flag for this
# debug_mode=False,
) as portal: ) as portal:
stream = await portal.run( stream = await portal.run(
mod.stream_quotes, attach_feed_bus,
brokername=brokername,
# TODO: actually handy multiple symbols... symbol=sym,
symbols=symbols,
shm_token=shm.token,
# compat with eventual ``tractor.msg.pub``
topics=symbols,
loglevel=loglevel, loglevel=loglevel,
) )
# TODO: we can't do this **and** be compate with
# ``tractor.msg.pub``, should we maybe just drop this after
# tests are in?
init_msg = await stream.receive()
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
# we are the buffer writer
readonly=False,
)
feed = Feed( feed = Feed(
name=brokername, name=brokername,
stream=stream, stream=stream,
@ -304,11 +460,6 @@ async def open_feed(
_brokerd_portal=portal, _brokerd_portal=portal,
) )
# TODO: we can't do this **and** be compate with
# ``tractor.msg.pub``, should we maybe just drop this after
# tests are in?
init_msg = await stream.receive()
for sym, data in init_msg.items(): for sym, data in init_msg.items():
si = data['symbol_info'] si = data['symbol_info']
@ -324,11 +475,6 @@ async def open_feed(
feed.symbols[sym] = symbol feed.symbols[sym] = symbol
shm_token = data['shm_token'] shm_token = data['shm_token']
if opened:
assert data['is_shm_writer']
log.info("Started shared mem bar writer")
else:
s = attach_shm_array(shm_token)
shm_token['dtype_descr'] = list(shm_token['dtype_descr']) shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity assert shm_token == shm.token # sanity