diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 0f61da20..649acca9 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -34,6 +34,7 @@ import logging import time import trio +from trio_typing import TaskStatus import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError @@ -46,14 +47,9 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client from ..log import get_logger, get_console_log -from ..data import ( - maybe_spawn_brokerd, - iterticks, - attach_shm_array, - subscribe_ohlc_for_increment, - _buffer, -) +from ..data import maybe_spawn_brokerd from ..data._source import from_df +from ..data._sharedmem import ShmArray from ._util import SymbolNotFound @@ -781,36 +777,17 @@ def normalize( return data -_local_buffer_writers = {} +# _local_buffer_writers = {} -@asynccontextmanager -async def activate_writer(key: str) -> (bool, trio.Nursery): - """Mark the current actor with module var determining - whether an existing shm writer task is already active. - - This avoids more then one writer resulting in data - clobbering. - """ - global _local_buffer_writers - - try: - assert not _local_buffer_writers.get(key, False) - - _local_buffer_writers[key] = True - - async with trio.open_nursery() as n: - yield n - finally: - _local_buffer_writers.pop(key, None) - - -async def fill_bars( +async def backfill_bars( sym: str, - first_bars: list, - shm: 'ShmArray', # type: ignore # noqa + # first_bars: list, + shm: ShmArray, # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Fill historical bars into shared mem / storage afap. @@ -818,41 +795,59 @@ async def fill_bars( https://github.com/pikers/piker/issues/128 """ - next_dt = first_bars[0].date + first_bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + ) - i = 0 - while i < count: + # write historical data to buffer + shm.push(bars_array) + # shm_token = shm.token - try: - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - end_dt=next_dt, - ) + with trio.CancelScope() as cs: - shm.push(bars_array, prepend=True) - i += 1 - next_dt = bars[0].date + task_status.started(cs) - except RequestError as err: - # TODO: retreive underlying ``ib_insync`` error? + next_dt = first_bars[0].date - if err.code == 162: + i = 0 + while i < count: - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "dead zone" - # and further requests seem to always cause - # throttling despite the rps being low - break + try: + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + end_dt=next_dt, + ) - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f` in TWS") + if bars_array is None: + raise SymbolNotFound(sym) - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - await tractor.breakpoint() + shm.push(bars_array, prepend=True) + i += 1 + next_dt = bars[0].date + + except RequestError as err: + # TODO: retreive underlying ``ib_insync`` error? + + if err.code == 162: + + if 'HMDS query returned no data' in err.message: + # means we hit some kind of historical "dead zone" + # and further requests seem to always cause + # throttling despite the rps being low + break + + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) + + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + await tractor.breakpoint() asset_type_map = { @@ -902,6 +897,7 @@ async def _setup_quote_stream( # log.debug(t) try: to_trio.send_nowait(t) + except trio.BrokenResourceError: # XXX: eventkit's ``Event.emit()`` for whatever redic # reason will catch and ignore regular exceptions @@ -946,24 +942,22 @@ async def start_aio_quote_stream( return from_aio -@tractor.stream async def stream_quotes( - ctx: tractor.Context, + send_chan: trio.abc.SendChannel, symbols: List[str], - shm_token: Tuple[str, str, List[tuple]], + shm: ShmArray, + feed_is_live: trio.Event, loglevel: str = None, - # compat for @tractor.msg.pub - topics: Any = None, - get_topics: Callable = None, -) -> AsyncIterator[Dict[str, Any]]: + # startup sync + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: """Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked once the brokerd is up. """ - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) # TODO: support multiple subscriptions sym = symbols[0] @@ -975,119 +969,63 @@ async def stream_quotes( stream = await start_aio_quote_stream(symbol=sym, contract=contract) - shm = None - async with trio.open_nursery() as ln: - # check if a writer already is alive in a streaming task, - # otherwise start one and mark it as now existing + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - key = shm_token['shm_name'] + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - writer_already_exists = _local_buffer_writers.get(key, False) + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - if not writer_already_exists: - _local_buffer_writers[key] = True + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - shm = attach_shm_array( - token=shm_token, + # for "traditional" assets, volume is normally discreet, not a float + syminfo['lot_tick_size'] = 0.0 - # we are the buffer writer - readonly=False, - ) - - # async def retrieve_and_push(): - start = time.time() - - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - - ) - - log.info(f"bars_array request: {time.time() - start}") - - if bars_array is None: - raise SymbolNotFound(sym) - - # write historical data to buffer - shm.push(bars_array) - shm_token = shm.token - - # TODO: generalize this for other brokers - # start bar filler task in bg - ln.start_soon(fill_bars, sym, bars, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) - - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) - - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 - - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 - - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'is_shm_writer': not writer_already_exists, - 'shm_token': shm_token, - 'symbol_info': syminfo, - } + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, } - await ctx.send_yield(init_msgs) + } - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - # should be real volume for this contract - calc_price = False - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = 'secType' - calc_price = True - # ticker = first_ticker + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + # should be real volume for this contract + calc_price = False + else: + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = 'secType' + calc_price = True - # pass first quote asap - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + # pass first quote asap + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - first_quote = {topic: quote} + first_quote = {topic: quote} - # yield first quote asap - await ctx.send_yield(first_quote) + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + log.debug(f"First ticker received {quote}") - log.debug(f"First ticker received {quote}") + task_status.started((init_msgs, first_quote)) - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + calc_price = False # should be real volume for contract - calc_price = False # should be real volume for contract - - # with trio.move_on_after(10) as cs: # wait for real volume on feed (trading might be closed) - async with aclosing(stream): async for ticker in stream: @@ -1104,104 +1042,29 @@ async def stream_quotes( # (ahem, ib_insync is truly stateful trash) ticker.ticks = [] - # tell incrementer task it can start - _buffer.shm_incrementing(key).set() - # XXX: this works because we don't use # ``aclosing()`` above? break - # enter stream loop - try: - async with stream: - await stream_and_write( - stream=stream, - calc_price=calc_price, - topic=topic, - write_shm=not writer_already_exists, - shm=shm, - suffix=suffix, - ctx=ctx, - ) - finally: - if not writer_already_exists: - _local_buffer_writers[key] = False + # tell caller quotes are now coming in live + feed_is_live.set() - stream.close() + async for ticker in stream: + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) -async def stream_and_write( - stream, - calc_price: bool, - topic: str, - write_shm: bool, - suffix: str, - ctx: tractor.Context, - shm: Optional['SharedArray'], # noqa -) -> None: - """Core quote streaming and shm writing loop; optimize for speed! + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - """ - # real-time stream - async with stream: - async for ticker in stream: + await send_chan.send({topic: quote}) - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - quote['symbol'] = topic - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if write_shm: - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] - - # print(f"{quote['symbol']}: {tick}") - - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - - new_v = tick.get('size', 0) - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - v + new_v, - ) - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic - - await ctx.send_yield({topic: quote}) - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ugh, clear ticks since we've consumed them + ticker.ticks = [] def pack_position(pos: Position) -> Dict[str, Any]: diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 8b3cc416..6da478d3 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -23,23 +23,23 @@ sharing your feeds with other fellow pikers. """ from dataclasses import dataclass, field from contextlib import asynccontextmanager +from functools import partial from importlib import import_module from types import ModuleType from typing import ( Dict, Any, Sequence, AsyncIterator, Optional, - Callable, Awaitable + List ) import trio +from trio_typing import TaskStatus import tractor from pydantic import BaseModel from ..brokers import get_brokermod from ..log import get_logger, get_console_log from .._daemon import ( - spawn_brokerd, - maybe_open_pikerd, maybe_spawn_brokerd, ) from ._normalize import iterticks @@ -53,7 +53,8 @@ from ._sharedmem import ( from ._source import base_iohlc_dtype, Symbol from ._buffer import ( increment_ohlc_buffer, - subscribe_ohlc_for_increment + subscribe_ohlc_for_increment, + shm_incrementing, ) __all__ = [ @@ -82,9 +83,8 @@ def get_ingestormod(name: str) -> ModuleType: return module -# @dataclass -class _FeedsCache(BaseModel): - """Data feeds manager. +class _FeedsBus(BaseModel): + """Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time streams that can be allocated and left alive indefinitely. @@ -92,82 +92,248 @@ class _FeedsCache(BaseModel): """ brokername: str nursery: trio.Nursery - tasks: Dict[str, trio.CancelScope] = {} + feeds: Dict[str, trio.CancelScope] = {} + subscribers: Dict[str, List[tractor.Context]] = {} class Config: arbitrary_types_allowed = True - # tasks: Dict[str, trio.CancelScope] = field(default_factory=dict) - - async def start_feed( - symbol: str, - func: Callable[[int], Awaitable[None]], - ) -> None: - """Start a bg feed task and register a surrouding cancel scope - for it. - - """ - with trio.CancelCscope() as cs: - pass - async def cancel_all(self) -> None: - for name, cs in self.tasks.item(): - log.debug(f'Cancelling cached feed for {name}') + for sym, (cs, msg, quote) in self.feeds.items(): + log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') cs.cancel() -_feeds: _FeedsCache = None +_bus: _FeedsBus = None -def get_feeds_manager( +def get_feed_bus( brokername: str, nursery: Optional[trio.Nursery] = None, -) -> _FeedsCache: +) -> _FeedsBus: """ - Retreive data feeds manager from process global scope. + Retreive broker-daemon-local data feeds bus from process global + scope. """ - global _feeds + global _bus if nursery is not None: - assert _feeds is None, "Feeds manager is already setup?" + assert _bus is None, "Feeds manager is already setup?" # this is initial setup by parent actor - _feeds = _FeedsCache( + _bus = _FeedsBus( brokername=brokername, nursery=nursery, ) - assert not _feeds.tasks + assert not _bus.feeds - assert _feeds.brokername == brokername, "Uhhh wtf" - return _feeds + assert _bus.brokername == brokername, "Uhhh wtf" + return _bus -async def _setup_persistent_feeds(brokername: str) -> None: +async def _setup_persistent_brokerd(brokername: str) -> None: """Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. """ - async with trio.open_nursery() as service_nursery: - _feeds = get_feeds_manager(brokername, service_nursery) + try: + async with trio.open_nursery() as service_nursery: - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() + # assign a nursery to the feeds bus for spawning + # background tasks from clients + bus = get_feed_bus(brokername, service_nursery) + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + finally: + await bus.cancel_all() + + +async def allocate_persistent_feed( + ctx: tractor.Context, + bus: _FeedsBus, + brokername: str, + symbol: str, + loglevel: str, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # allocate shm array for this broker/symbol + # XXX: we should get an error here if one already exists + + shm, opened = maybe_open_shm_array( + key=sym_to_shm_key(brokername, symbol), + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) + + # assert opened + if not opened: + # do history validation? + pass + + send, quote_stream = trio.open_memory_channel(2**8) + feed_is_live = trio.Event() + + # establish broker backend quote stream + # ``stream_quotes()`` is a required backend func + init_msg, first_quote = await bus.nursery.start( + partial( + mod.stream_quotes, + send_chan=send, + feed_is_live=feed_is_live, + symbols=[symbol], + shm=shm, + loglevel=loglevel, + ) + ) + + init_msg[symbol]['shm_token'] = shm.token + cs = trio.CancelScope() + bus.feeds[symbol] = (cs, init_msg, first_quote) + + with cs: + + if opened: + # start history backfill task + # ``backfill_bars()`` is a required backend func + await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) + + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + await feed_is_live.wait() + + # tell incrementer task it can start + shm_incrementing(shm.token['shm_name']).set() + + # start shm incrementingn for OHLC sampling + subscribe_ohlc_for_increment(shm, delay_s) + + # begin shm write loop and broadcast to subscribers + + sum_tick_vlm: bool = True + + async with quote_stream: + + log.info("Started shared mem bar writer") + + # iterate stream delivered by broker + async for quotes in quote_stream: + for sym, quote in quotes.items(): + + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # start writing the shm buffer with appropriate trade data + for tick in iterticks(quote, types=('trade', 'utrade',)): + last = tick['price'] + + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + + new_v = tick.get('size', 0) + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + if sum_tick_vlm: + volume = v + new_v + else: + volume = v + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + volume, + ) + + for ctx in bus.subscribers[sym]: + await ctx.send_yield({sym: quote}) @tractor.stream -async def allocate_cached_feed( +async def attach_feed_bus( ctx: tractor.Context, - symbol: str + brokername: str, + symbol: str, + loglevel: str, ): - _feeds = get_feeds_manager(brokername, service_nursery) - # setup shared mem buffer - pass + if loglevel is None: + loglevel = tractor.current_actor().loglevel + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # ensure we are who we think we are + assert 'brokerd' in tractor.current_actor().name + + bus = get_feed_bus(brokername) + task_cs = bus.feeds.get(symbol) + bus.subscribers.setdefault(symbol, []).append(ctx) + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + if task_cs is None: + init_msg, first_quote = await bus.nursery.start( + partial( + allocate_persistent_feed, + ctx=ctx, + bus=bus, + brokername=brokername, + symbol=symbol, + loglevel=loglevel, + ) + ) + + # XXX: ``first_quote`` may be outdated here if this is secondary subscriber + cs, init_msg, first_quote = bus.feeds[symbol] + + # send this even to subscribers to existing feed? + await ctx.send_yield(init_msg) + await ctx.send_yield(first_quote) + + try: + # just block while the stream pumps + await trio.sleep_forever() + + finally: + bus.subscribers[symbol].remove(ctx) @dataclass @@ -183,7 +349,7 @@ class Feed: stream: AsyncIterator[Dict[str, Any]] shm: ShmArray mod: ModuleType - # ticks: ShmArray + _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None @@ -262,40 +428,30 @@ async def open_feed( # TODO: do all! sym = symbols[0] - # Attempt to allocate (or attach to) shm array for this broker/symbol - shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, sym), - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=True, - ) - async with maybe_spawn_brokerd( - brokername, loglevel=loglevel, - - # TODO: add a cli flag for this - # debug_mode=False, - ) as portal: stream = await portal.run( - mod.stream_quotes, - - # TODO: actually handy multiple symbols... - symbols=symbols, - - shm_token=shm.token, - - # compat with eventual ``tractor.msg.pub`` - topics=symbols, + attach_feed_bus, + brokername=brokername, + symbol=sym, loglevel=loglevel, ) + # TODO: we can't do this **and** be compate with + # ``tractor.msg.pub``, should we maybe just drop this after + # tests are in? + init_msg = await stream.receive() + + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + + # we are the buffer writer + readonly=False, + ) + feed = Feed( name=brokername, stream=stream, @@ -304,11 +460,6 @@ async def open_feed( _brokerd_portal=portal, ) - # TODO: we can't do this **and** be compate with - # ``tractor.msg.pub``, should we maybe just drop this after - # tests are in? - init_msg = await stream.receive() - for sym, data in init_msg.items(): si = data['symbol_info'] @@ -324,11 +475,6 @@ async def open_feed( feed.symbols[sym] = symbol shm_token = data['shm_token'] - if opened: - assert data['is_shm_writer'] - log.info("Started shared mem bar writer") - else: - s = attach_shm_array(shm_token) shm_token['dtype_descr'] = list(shm_token['dtype_descr']) assert shm_token == shm.token # sanity