From 17d3e7a9e24ab3e669fa040a150a0c546502313e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Mar 2021 08:34:27 -0400 Subject: [PATCH 01/37] Don't shield ems feed; enact remote cancels --- piker/clearing/_ems.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5ebfee4a..9e7b9045 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -268,6 +268,7 @@ async def exec_loop( *trio.open_memory_channel(100), _buys={}, _sells={}, + _reqids={}, ) @@ -284,22 +285,20 @@ async def exec_loop( # return control to parent task task_status.started((first_quote, feed, client)) - # shield this field so the remote brokerd does not get cancelled stream = feed.stream - with stream.shield(): - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) + async with trio.open_nursery() as n: + n.start_soon( + execute_triggers, + broker, + symbol, + stream, + ctx, + client, + book + ) - if _exec_mode == 'paper': - n.start_soon(simulate_fills, stream.clone(), client) + if _exec_mode == 'paper': + n.start_soon(simulate_fills, stream.clone(), client) # TODO: lots of cases still to handle From aa61bf5a65299aed80fc8dda980ded39940dfb92 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Mar 2021 08:35:11 -0400 Subject: [PATCH 02/37] Use mem-chans for quote streams; clone for multiple consumers --- piker/brokers/ib.py | 211 ++++++++++++++++++++++++++++++-------------- 1 file changed, 143 insertions(+), 68 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2679e988..99475afc 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -168,6 +168,7 @@ class Client: # contract cache self._contracts: Dict[str, Contract] = {} + self._feeds: Dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -387,7 +388,6 @@ class Client: async def stream_ticker( self, symbol: str, - to_trio, opts: Tuple[int] = ('375', '233', '236'), contract: Optional[Contract] = None, ) -> None: @@ -396,8 +396,16 @@ class Client: contract = contract or (await self.find_contract(symbol)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) + feed = self._feeds.get(symbol) + if feed: + # do something else + # await tractor.breakpoint() + to_trio, from_aio = feed + return from_aio.clone() + # define a simple queue push routine that streams quote packets # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore def push(t): """Push quotes to trio task. @@ -415,10 +423,15 @@ class Client: log.error(f"Disconnected stream for `{symbol}`") self.ib.cancelMktData(contract) + # decouple broadcast mem chan + self._feeds.pop(symbol, None) + ticker.updateEvent.connect(push) - # let the engine run and stream - await self.ib.disconnectedEvent + # cache feed for later consumers + self._feeds[symbol] = to_trio, from_aio + + return from_aio async def get_quote( self, @@ -691,13 +704,14 @@ async def _trio_run_client_method( # if the method is an *async gen* stream for it meth = getattr(Client, method) - if inspect.isasyncgenfunction(meth): - kwargs['_treat_as_stream'] = True - # if the method is an *async func* but manually - # streams back results, make sure to also stream it args = tuple(inspect.getfullargspec(meth).args) - if 'to_trio' in args: + + if inspect.isasyncgenfunction(meth) or ( + # if the method is an *async func* but manually + # streams back results, make sure to also stream it + 'to_trio' in args + ): kwargs['_treat_as_stream'] = True result = await tractor.to_asyncio.run_task( @@ -780,7 +794,7 @@ def normalize( # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: - if tick: + if tick and not isinstance(tick, dict): td = tick._asdict() td['type'] = tick_types.get(td['tickType'], 'n/a') @@ -840,7 +854,7 @@ async def fill_bars( first_bars: list, shm: 'ShmArray', # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 6, # NOTE: any more and we'll overrun the underlying buffer + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer ) -> None: """Fill historical bars into shared mem / storage afap. @@ -904,6 +918,62 @@ asset_type_map = { } + +_quote_streams: Dict[str, trio.abc.ReceiveStream] = {} + + +async def stream_quotes( + client, + symbol: str, + opts: Tuple[int] = ('375', '233', '236'), + contract: Optional[Contract] = None, +) -> None: + """Stream a ticker using the std L1 api. + """ + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + + chans = _quote_streams.get(symbol) + + if feed: + + # if we already have a cached feed deliver a rx side clone to + # consumer + to_trio, from_aio = chans + return from_aio.clone() + + + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + + def push(t): + """Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + self.ib.cancelMktData(contract) + + # decouple broadcast mem chan + self._feeds.pop(symbol, None) + + ticker.updateEvent.connect(push) + + # cache feed for later consumers + _quote_streams[symbol] = to_trio, from_aio + + return from_aio + + # TODO: figure out how to share quote feeds sanely despite # the wacky ``ib_insync`` api. # @tractor.msg.pub @@ -1058,6 +1128,7 @@ async def stream_quotes( # wait for real volume on feed (trading might be closed) async with aclosing(stream): + async for ticker in stream: # for a real volume contract we rait for the first @@ -1081,25 +1152,28 @@ async def stream_quotes( # enter stream loop try: - await stream_and_write( - stream=stream, - calc_price=calc_price, - topic=topic, - writer_already_exists=writer_already_exists, - shm=shm, - suffix=suffix, - ctx=ctx, - ) + async with stream: + await stream_and_write( + stream=stream, + calc_price=calc_price, + topic=topic, + write_shm=not writer_already_exists, + shm=shm, + suffix=suffix, + ctx=ctx, + ) finally: if not writer_already_exists: _local_buffer_writers[key] = False + stream.close() + async def stream_and_write( stream, calc_price: bool, topic: str, - writer_already_exists: bool, + write_shm: bool, suffix: str, ctx: tractor.Context, shm: Optional['SharedArray'], # noqa @@ -1108,64 +1182,65 @@ async def stream_and_write( """ # real-time stream - async for ticker in stream: + async with stream: + async for ticker in stream: - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - quote['symbol'] = topic - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) + quote['symbol'] = topic + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_already_exists: - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if write_shm: + for tick in iterticks(quote, types=('trade', 'utrade',)): + last = tick['price'] - # print(f"{quote['symbol']}: {tick}") + # print(f"{quote['symbol']}: {tick}") - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - new_v = tick.get('size', 0) + new_v = tick.get('size', 0) - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - v + new_v, - ) + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + v + new_v, + ) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - await ctx.send_yield({topic: quote}) + await ctx.send_yield({topic: quote}) - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ugh, clear ticks since we've consumed them + ticker.ticks = [] def pack_position(pos: Position) -> Dict[str, Any]: From 0d4073dbd2b7d2865e403ce3a2718d3ce5ae0e65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Mar 2021 11:37:27 -0400 Subject: [PATCH 03/37] Move quote stream setup into a cacheing func --- piker/brokers/ib.py | 155 +++++++++++++++++--------------------------- 1 file changed, 58 insertions(+), 97 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 99475afc..16d74b9d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -385,54 +385,6 @@ class Client: formatDate=2, # timezone aware UTC datetime ) - async def stream_ticker( - self, - symbol: str, - opts: Tuple[int] = ('375', '233', '236'), - contract: Optional[Contract] = None, - ) -> None: - """Stream a ticker using the std L1 api. - """ - contract = contract or (await self.find_contract(symbol)) - ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - - feed = self._feeds.get(symbol) - if feed: - # do something else - # await tractor.breakpoint() - to_trio, from_aio = feed - return from_aio.clone() - - # define a simple queue push routine that streams quote packets - # to trio over the ``to_trio`` memory channel. - to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore - - def push(t): - """Push quotes to trio task. - - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - except trio.BrokenResourceError: - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - self.ib.cancelMktData(contract) - - # decouple broadcast mem chan - self._feeds.pop(symbol, None) - - ticker.updateEvent.connect(push) - - # cache feed for later consumers - self._feeds[symbol] = to_trio, from_aio - - return from_aio - async def get_quote( self, symbol: str, @@ -626,6 +578,8 @@ async def _aio_get_client( client_id: Optional[int] = None, ) -> Client: """Return an ``ib_insync.IB`` instance wrapped in our client API. + + Client instances are cached for later use. """ # first check cache for existing client @@ -665,8 +619,10 @@ async def _aio_get_client( # create and cache try: client = Client(ib) + _client_cache[(host, port)] = client log.debug(f"Caching client for {(host, port)}") + yield client except BaseException: @@ -918,65 +874,78 @@ asset_type_map = { } - _quote_streams: Dict[str, trio.abc.ReceiveStream] = {} -async def stream_quotes( - client, +async def _setup_quote_stream( symbol: str, opts: Tuple[int] = ('375', '233', '236'), contract: Optional[Contract] = None, ) -> None: """Stream a ticker using the std L1 api. """ - contract = contract or (await client.find_contract(symbol)) - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + global _quote_streams - chans = _quote_streams.get(symbol) + async with _aio_get_client() as client: - if feed: + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) - # if we already have a cached feed deliver a rx side clone to - # consumer - to_trio, from_aio = chans + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + + def push(t): + """Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + client.ib.cancelMktData(contract) + + # decouple broadcast mem chan + _quote_streams.pop(symbol, None) + + ticker.updateEvent.connect(push) + + return from_aio + + +async def start_aio_quote_stream( + symbol: str, + contract: Optional[Contract] = None, +) -> trio.abc.ReceiveStream: + + global _quote_streams + + from_aio = _quote_streams.get(symbol) + if from_aio: + + # if we already have a cached feed deliver a rx side clone to consumer return from_aio.clone() + else: - # define a simple queue push routine that streams quote packets - # to trio over the ``to_trio`` memory channel. - to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + from_aio = await tractor.to_asyncio.run_task( + _setup_quote_stream, + symbol=symbol, + contract=contract, + ) - def push(t): - """Push quotes to trio task. + # cache feed for later consumers + _quote_streams[symbol] = from_aio - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - except trio.BrokenResourceError: - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - self.ib.cancelMktData(contract) - - # decouple broadcast mem chan - self._feeds.pop(symbol, None) - - ticker.updateEvent.connect(push) - - # cache feed for later consumers - _quote_streams[symbol] = to_trio, from_aio - - return from_aio + return from_aio -# TODO: figure out how to share quote feeds sanely despite -# the wacky ``ib_insync`` api. -# @tractor.msg.pub @tractor.stream async def stream_quotes( ctx: tractor.Context, @@ -1004,11 +973,7 @@ async def stream_quotes( symbol=sym, ) - stream = await _trio_run_client_method( - method='stream_ticker', - contract=contract, # small speedup - symbol=sym, - ) + stream = await start_aio_quote_stream(symbol=sym, contract=contract) shm = None async with trio.open_nursery() as ln: @@ -1059,8 +1024,6 @@ async def stream_quotes( subscribe_ohlc_for_increment(shm, delay_s) # pass back some symbol info like min_tick, trading_hours, etc. - # con = asdict(contract) - # syminfo = contract syminfo = asdict(details) syminfo.update(syminfo['contract']) @@ -1111,8 +1074,6 @@ async def stream_quotes( # yield first quote asap await ctx.send_yield(first_quote) - # ticker.ticks = [] - # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = [] From 65e7680cdd3b804d3bd052742ec2ad61c3346123 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 25 Mar 2021 10:26:02 -0400 Subject: [PATCH 04/37] Draft a feed cacheing sub-system --- piker/data/__init__.py | 130 ++++++++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 40 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 0559cbfb..8b3cc416 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -26,14 +26,22 @@ from contextlib import asynccontextmanager from importlib import import_module from types import ModuleType from typing import ( - Dict, Any, Sequence, AsyncIterator, Optional + Dict, Any, Sequence, + AsyncIterator, Optional, + Callable, Awaitable ) +import trio import tractor +from pydantic import BaseModel from ..brokers import get_brokermod from ..log import get_logger, get_console_log -from .._daemon import spawn_brokerd, maybe_open_pikerd +from .._daemon import ( + spawn_brokerd, + maybe_open_pikerd, + maybe_spawn_brokerd, +) from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -74,57 +82,97 @@ def get_ingestormod(name: str) -> ModuleType: return module -@asynccontextmanager -async def maybe_spawn_brokerd( - brokername: str, - loglevel: Optional[str] = None, +# @dataclass +class _FeedsCache(BaseModel): + """Data feeds manager. - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = True, -) -> tractor._portal.Portal: - """If no ``brokerd.{brokername}`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. + This is a brokerd side api used to manager persistent real-time + streams that can be allocated and left alive indefinitely. """ - if loglevel: - get_console_log(loglevel) + brokername: str + nursery: trio.Nursery + tasks: Dict[str, trio.CancelScope] = {} - dname = f'brokerd.{brokername}' - async with tractor.find_actor(dname) as portal: + class Config: + arbitrary_types_allowed = True - # WTF: why doesn't this work? - if portal is not None: - yield portal + # tasks: Dict[str, trio.CancelScope] = field(default_factory=dict) - else: - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - loglevel=loglevel - ) as pikerd_portal: + async def start_feed( + symbol: str, + func: Callable[[int], Awaitable[None]], + ) -> None: + """Start a bg feed task and register a surrouding cancel scope + for it. - if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_brokerd(brokername, loglevel=loglevel) + """ + with trio.CancelCscope() as cs: + pass - else: - await pikerd_portal.run( - spawn_brokerd, - brokername=brokername, - loglevel=loglevel, - debug_mode=debug_mode, - ) + async def cancel_all(self) -> None: + for name, cs in self.tasks.item(): + log.debug(f'Cancelling cached feed for {name}') + cs.cancel() + + +_feeds: _FeedsCache = None + + +def get_feeds_manager( + brokername: str, + nursery: Optional[trio.Nursery] = None, +) -> _FeedsCache: + """ + Retreive data feeds manager from process global scope. + + """ + + global _feeds + + if nursery is not None: + assert _feeds is None, "Feeds manager is already setup?" + + # this is initial setup by parent actor + _feeds = _FeedsCache( + brokername=brokername, + nursery=nursery, + ) + assert not _feeds.tasks + + assert _feeds.brokername == brokername, "Uhhh wtf" + return _feeds + + +async def _setup_persistent_feeds(brokername: str) -> None: + """Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + """ + async with trio.open_nursery() as service_nursery: + _feeds = get_feeds_manager(brokername, service_nursery) + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + +@tractor.stream +async def allocate_cached_feed( + ctx: tractor.Context, + symbol: str +): + _feeds = get_feeds_manager(brokername, service_nursery) + + # setup shared mem buffer + pass - async with tractor.wait_for_actor(dname) as portal: - yield portal @dataclass class Feed: - """A data feed for client-side interaction with far-process + """A data feed for client-side interaction with far-process# }}} real-time data sources. This is an thin abstraction on top of ``tractor``'s portals for @@ -279,6 +327,8 @@ async def open_feed( if opened: assert data['is_shm_writer'] log.info("Started shared mem bar writer") + else: + s = attach_shm_array(shm_token) shm_token['dtype_descr'] = list(shm_token['dtype_descr']) assert shm_token == shm.token # sanity From f17a26c948d75af8a534fc30828d70f51cbf575a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Mar 2021 11:51:00 -0400 Subject: [PATCH 05/37] Don't mistakenly alias options to underlyingn for positions --- piker/brokers/ib.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 16d74b9d..0f61da20 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -37,7 +37,7 @@ import trio import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError -from ib_insync.contract import Contract, ContractDetails +from ib_insync.contract import Contract, ContractDetails, Option from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import Position @@ -1206,10 +1206,18 @@ async def stream_and_write( def pack_position(pos: Position) -> Dict[str, Any]: con = pos.contract + + if isinstance(con, Option): + # TODO: option symbol parsing and sane display: + symbol = con.localSymbol.replace(' ', '') + + else: + symbol = con.symbol + return { 'broker': 'ib', 'account': pos.account, - 'symbol': con.symbol, + 'symbol': symbol, 'currency': con.currency, 'size': float(pos.position), 'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), From a82f43e3a5bfd5a650cc847d28a56faa8785dcfe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Mar 2021 08:22:27 -0400 Subject: [PATCH 06/37] Rework data feed API to allow for caching streams Move all feed/stream agnostic logic and shared mem writing into a new set of routines inside the ``data`` sub-package. This lets us move toward a more standard API for broker and data backends to provide cache-able persistent streams to client apps. The data layer now takes care of - starting a single background brokerd task to start a stream for as symbol if none yet exists and register that stream for later lookups - the existing broker backend actor is now always re-used if possible if it can be found in a service tree - synchronization with the brokerd stream's startup sequence is now oriented around fast startup concurrency such that client code gets a handle to historical data and quote schema as fast as possible - historical data loading is delegated to the backend more formally by starting a ``backfill_bars()`` task - write shared mem in the brokerd task and only destruct it once requested either from the parent actor or further clients - fully de-duplicate stream data by using a dynamic pub-sub strategy where new clients register for copies of the same quote set per symbol This new API is entirely working with the IB backend; others will need to be ported. That's to come shortly. --- piker/brokers/ib.py | 373 +++++++++++++---------------------------- piker/data/__init__.py | 306 ++++++++++++++++++++++++--------- 2 files changed, 344 insertions(+), 335 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 0f61da20..649acca9 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -34,6 +34,7 @@ import logging import time import trio +from trio_typing import TaskStatus import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError @@ -46,14 +47,9 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client from ..log import get_logger, get_console_log -from ..data import ( - maybe_spawn_brokerd, - iterticks, - attach_shm_array, - subscribe_ohlc_for_increment, - _buffer, -) +from ..data import maybe_spawn_brokerd from ..data._source import from_df +from ..data._sharedmem import ShmArray from ._util import SymbolNotFound @@ -781,36 +777,17 @@ def normalize( return data -_local_buffer_writers = {} +# _local_buffer_writers = {} -@asynccontextmanager -async def activate_writer(key: str) -> (bool, trio.Nursery): - """Mark the current actor with module var determining - whether an existing shm writer task is already active. - - This avoids more then one writer resulting in data - clobbering. - """ - global _local_buffer_writers - - try: - assert not _local_buffer_writers.get(key, False) - - _local_buffer_writers[key] = True - - async with trio.open_nursery() as n: - yield n - finally: - _local_buffer_writers.pop(key, None) - - -async def fill_bars( +async def backfill_bars( sym: str, - first_bars: list, - shm: 'ShmArray', # type: ignore # noqa + # first_bars: list, + shm: ShmArray, # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Fill historical bars into shared mem / storage afap. @@ -818,41 +795,59 @@ async def fill_bars( https://github.com/pikers/piker/issues/128 """ - next_dt = first_bars[0].date + first_bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + ) - i = 0 - while i < count: + # write historical data to buffer + shm.push(bars_array) + # shm_token = shm.token - try: - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - end_dt=next_dt, - ) + with trio.CancelScope() as cs: - shm.push(bars_array, prepend=True) - i += 1 - next_dt = bars[0].date + task_status.started(cs) - except RequestError as err: - # TODO: retreive underlying ``ib_insync`` error? + next_dt = first_bars[0].date - if err.code == 162: + i = 0 + while i < count: - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "dead zone" - # and further requests seem to always cause - # throttling despite the rps being low - break + try: + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + end_dt=next_dt, + ) - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f` in TWS") + if bars_array is None: + raise SymbolNotFound(sym) - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - await tractor.breakpoint() + shm.push(bars_array, prepend=True) + i += 1 + next_dt = bars[0].date + + except RequestError as err: + # TODO: retreive underlying ``ib_insync`` error? + + if err.code == 162: + + if 'HMDS query returned no data' in err.message: + # means we hit some kind of historical "dead zone" + # and further requests seem to always cause + # throttling despite the rps being low + break + + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) + + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + await tractor.breakpoint() asset_type_map = { @@ -902,6 +897,7 @@ async def _setup_quote_stream( # log.debug(t) try: to_trio.send_nowait(t) + except trio.BrokenResourceError: # XXX: eventkit's ``Event.emit()`` for whatever redic # reason will catch and ignore regular exceptions @@ -946,24 +942,22 @@ async def start_aio_quote_stream( return from_aio -@tractor.stream async def stream_quotes( - ctx: tractor.Context, + send_chan: trio.abc.SendChannel, symbols: List[str], - shm_token: Tuple[str, str, List[tuple]], + shm: ShmArray, + feed_is_live: trio.Event, loglevel: str = None, - # compat for @tractor.msg.pub - topics: Any = None, - get_topics: Callable = None, -) -> AsyncIterator[Dict[str, Any]]: + # startup sync + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: """Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked once the brokerd is up. """ - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) # TODO: support multiple subscriptions sym = symbols[0] @@ -975,119 +969,63 @@ async def stream_quotes( stream = await start_aio_quote_stream(symbol=sym, contract=contract) - shm = None - async with trio.open_nursery() as ln: - # check if a writer already is alive in a streaming task, - # otherwise start one and mark it as now existing + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - key = shm_token['shm_name'] + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - writer_already_exists = _local_buffer_writers.get(key, False) + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - if not writer_already_exists: - _local_buffer_writers[key] = True + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - shm = attach_shm_array( - token=shm_token, + # for "traditional" assets, volume is normally discreet, not a float + syminfo['lot_tick_size'] = 0.0 - # we are the buffer writer - readonly=False, - ) - - # async def retrieve_and_push(): - start = time.time() - - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - - ) - - log.info(f"bars_array request: {time.time() - start}") - - if bars_array is None: - raise SymbolNotFound(sym) - - # write historical data to buffer - shm.push(bars_array) - shm_token = shm.token - - # TODO: generalize this for other brokers - # start bar filler task in bg - ln.start_soon(fill_bars, sym, bars, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) - - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) - - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 - - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 - - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'is_shm_writer': not writer_already_exists, - 'shm_token': shm_token, - 'symbol_info': syminfo, - } + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, } - await ctx.send_yield(init_msgs) + } - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - # should be real volume for this contract - calc_price = False - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = 'secType' - calc_price = True - # ticker = first_ticker + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + # should be real volume for this contract + calc_price = False + else: + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = 'secType' + calc_price = True - # pass first quote asap - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + # pass first quote asap + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - first_quote = {topic: quote} + first_quote = {topic: quote} - # yield first quote asap - await ctx.send_yield(first_quote) + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + log.debug(f"First ticker received {quote}") - log.debug(f"First ticker received {quote}") + task_status.started((init_msgs, first_quote)) - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + calc_price = False # should be real volume for contract - calc_price = False # should be real volume for contract - - # with trio.move_on_after(10) as cs: # wait for real volume on feed (trading might be closed) - async with aclosing(stream): async for ticker in stream: @@ -1104,104 +1042,29 @@ async def stream_quotes( # (ahem, ib_insync is truly stateful trash) ticker.ticks = [] - # tell incrementer task it can start - _buffer.shm_incrementing(key).set() - # XXX: this works because we don't use # ``aclosing()`` above? break - # enter stream loop - try: - async with stream: - await stream_and_write( - stream=stream, - calc_price=calc_price, - topic=topic, - write_shm=not writer_already_exists, - shm=shm, - suffix=suffix, - ctx=ctx, - ) - finally: - if not writer_already_exists: - _local_buffer_writers[key] = False + # tell caller quotes are now coming in live + feed_is_live.set() - stream.close() + async for ticker in stream: + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) -async def stream_and_write( - stream, - calc_price: bool, - topic: str, - write_shm: bool, - suffix: str, - ctx: tractor.Context, - shm: Optional['SharedArray'], # noqa -) -> None: - """Core quote streaming and shm writing loop; optimize for speed! + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - """ - # real-time stream - async with stream: - async for ticker in stream: + await send_chan.send({topic: quote}) - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - quote['symbol'] = topic - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if write_shm: - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] - - # print(f"{quote['symbol']}: {tick}") - - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - - new_v = tick.get('size', 0) - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - v + new_v, - ) - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic - - await ctx.send_yield({topic: quote}) - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ugh, clear ticks since we've consumed them + ticker.ticks = [] def pack_position(pos: Position) -> Dict[str, Any]: diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 8b3cc416..6da478d3 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -23,23 +23,23 @@ sharing your feeds with other fellow pikers. """ from dataclasses import dataclass, field from contextlib import asynccontextmanager +from functools import partial from importlib import import_module from types import ModuleType from typing import ( Dict, Any, Sequence, AsyncIterator, Optional, - Callable, Awaitable + List ) import trio +from trio_typing import TaskStatus import tractor from pydantic import BaseModel from ..brokers import get_brokermod from ..log import get_logger, get_console_log from .._daemon import ( - spawn_brokerd, - maybe_open_pikerd, maybe_spawn_brokerd, ) from ._normalize import iterticks @@ -53,7 +53,8 @@ from ._sharedmem import ( from ._source import base_iohlc_dtype, Symbol from ._buffer import ( increment_ohlc_buffer, - subscribe_ohlc_for_increment + subscribe_ohlc_for_increment, + shm_incrementing, ) __all__ = [ @@ -82,9 +83,8 @@ def get_ingestormod(name: str) -> ModuleType: return module -# @dataclass -class _FeedsCache(BaseModel): - """Data feeds manager. +class _FeedsBus(BaseModel): + """Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time streams that can be allocated and left alive indefinitely. @@ -92,82 +92,248 @@ class _FeedsCache(BaseModel): """ brokername: str nursery: trio.Nursery - tasks: Dict[str, trio.CancelScope] = {} + feeds: Dict[str, trio.CancelScope] = {} + subscribers: Dict[str, List[tractor.Context]] = {} class Config: arbitrary_types_allowed = True - # tasks: Dict[str, trio.CancelScope] = field(default_factory=dict) - - async def start_feed( - symbol: str, - func: Callable[[int], Awaitable[None]], - ) -> None: - """Start a bg feed task and register a surrouding cancel scope - for it. - - """ - with trio.CancelCscope() as cs: - pass - async def cancel_all(self) -> None: - for name, cs in self.tasks.item(): - log.debug(f'Cancelling cached feed for {name}') + for sym, (cs, msg, quote) in self.feeds.items(): + log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') cs.cancel() -_feeds: _FeedsCache = None +_bus: _FeedsBus = None -def get_feeds_manager( +def get_feed_bus( brokername: str, nursery: Optional[trio.Nursery] = None, -) -> _FeedsCache: +) -> _FeedsBus: """ - Retreive data feeds manager from process global scope. + Retreive broker-daemon-local data feeds bus from process global + scope. """ - global _feeds + global _bus if nursery is not None: - assert _feeds is None, "Feeds manager is already setup?" + assert _bus is None, "Feeds manager is already setup?" # this is initial setup by parent actor - _feeds = _FeedsCache( + _bus = _FeedsBus( brokername=brokername, nursery=nursery, ) - assert not _feeds.tasks + assert not _bus.feeds - assert _feeds.brokername == brokername, "Uhhh wtf" - return _feeds + assert _bus.brokername == brokername, "Uhhh wtf" + return _bus -async def _setup_persistent_feeds(brokername: str) -> None: +async def _setup_persistent_brokerd(brokername: str) -> None: """Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. """ - async with trio.open_nursery() as service_nursery: - _feeds = get_feeds_manager(brokername, service_nursery) + try: + async with trio.open_nursery() as service_nursery: - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() + # assign a nursery to the feeds bus for spawning + # background tasks from clients + bus = get_feed_bus(brokername, service_nursery) + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + finally: + await bus.cancel_all() + + +async def allocate_persistent_feed( + ctx: tractor.Context, + bus: _FeedsBus, + brokername: str, + symbol: str, + loglevel: str, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # allocate shm array for this broker/symbol + # XXX: we should get an error here if one already exists + + shm, opened = maybe_open_shm_array( + key=sym_to_shm_key(brokername, symbol), + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) + + # assert opened + if not opened: + # do history validation? + pass + + send, quote_stream = trio.open_memory_channel(2**8) + feed_is_live = trio.Event() + + # establish broker backend quote stream + # ``stream_quotes()`` is a required backend func + init_msg, first_quote = await bus.nursery.start( + partial( + mod.stream_quotes, + send_chan=send, + feed_is_live=feed_is_live, + symbols=[symbol], + shm=shm, + loglevel=loglevel, + ) + ) + + init_msg[symbol]['shm_token'] = shm.token + cs = trio.CancelScope() + bus.feeds[symbol] = (cs, init_msg, first_quote) + + with cs: + + if opened: + # start history backfill task + # ``backfill_bars()`` is a required backend func + await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) + + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + await feed_is_live.wait() + + # tell incrementer task it can start + shm_incrementing(shm.token['shm_name']).set() + + # start shm incrementingn for OHLC sampling + subscribe_ohlc_for_increment(shm, delay_s) + + # begin shm write loop and broadcast to subscribers + + sum_tick_vlm: bool = True + + async with quote_stream: + + log.info("Started shared mem bar writer") + + # iterate stream delivered by broker + async for quotes in quote_stream: + for sym, quote in quotes.items(): + + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # start writing the shm buffer with appropriate trade data + for tick in iterticks(quote, types=('trade', 'utrade',)): + last = tick['price'] + + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + + new_v = tick.get('size', 0) + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + if sum_tick_vlm: + volume = v + new_v + else: + volume = v + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + volume, + ) + + for ctx in bus.subscribers[sym]: + await ctx.send_yield({sym: quote}) @tractor.stream -async def allocate_cached_feed( +async def attach_feed_bus( ctx: tractor.Context, - symbol: str + brokername: str, + symbol: str, + loglevel: str, ): - _feeds = get_feeds_manager(brokername, service_nursery) - # setup shared mem buffer - pass + if loglevel is None: + loglevel = tractor.current_actor().loglevel + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # ensure we are who we think we are + assert 'brokerd' in tractor.current_actor().name + + bus = get_feed_bus(brokername) + task_cs = bus.feeds.get(symbol) + bus.subscribers.setdefault(symbol, []).append(ctx) + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + if task_cs is None: + init_msg, first_quote = await bus.nursery.start( + partial( + allocate_persistent_feed, + ctx=ctx, + bus=bus, + brokername=brokername, + symbol=symbol, + loglevel=loglevel, + ) + ) + + # XXX: ``first_quote`` may be outdated here if this is secondary subscriber + cs, init_msg, first_quote = bus.feeds[symbol] + + # send this even to subscribers to existing feed? + await ctx.send_yield(init_msg) + await ctx.send_yield(first_quote) + + try: + # just block while the stream pumps + await trio.sleep_forever() + + finally: + bus.subscribers[symbol].remove(ctx) @dataclass @@ -183,7 +349,7 @@ class Feed: stream: AsyncIterator[Dict[str, Any]] shm: ShmArray mod: ModuleType - # ticks: ShmArray + _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None @@ -262,40 +428,30 @@ async def open_feed( # TODO: do all! sym = symbols[0] - # Attempt to allocate (or attach to) shm array for this broker/symbol - shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, sym), - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=True, - ) - async with maybe_spawn_brokerd( - brokername, loglevel=loglevel, - - # TODO: add a cli flag for this - # debug_mode=False, - ) as portal: stream = await portal.run( - mod.stream_quotes, - - # TODO: actually handy multiple symbols... - symbols=symbols, - - shm_token=shm.token, - - # compat with eventual ``tractor.msg.pub`` - topics=symbols, + attach_feed_bus, + brokername=brokername, + symbol=sym, loglevel=loglevel, ) + # TODO: we can't do this **and** be compate with + # ``tractor.msg.pub``, should we maybe just drop this after + # tests are in? + init_msg = await stream.receive() + + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + + # we are the buffer writer + readonly=False, + ) + feed = Feed( name=brokername, stream=stream, @@ -304,11 +460,6 @@ async def open_feed( _brokerd_portal=portal, ) - # TODO: we can't do this **and** be compate with - # ``tractor.msg.pub``, should we maybe just drop this after - # tests are in? - init_msg = await stream.receive() - for sym, data in init_msg.items(): si = data['symbol_info'] @@ -324,11 +475,6 @@ async def open_feed( feed.symbols[sym] = symbol shm_token = data['shm_token'] - if opened: - assert data['is_shm_writer'] - log.info("Started shared mem bar writer") - else: - s = attach_shm_array(shm_token) shm_token['dtype_descr'] = list(shm_token['dtype_descr']) assert shm_token == shm.token # sanity From 14c5fc24ecaad737edb14b34aa6f744b7d345d69 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Mar 2021 08:34:56 -0400 Subject: [PATCH 07/37] Port to new setup routine name --- piker/_daemon.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/piker/_daemon.py b/piker/_daemon.py index 72e390f2..80175ae4 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -134,6 +134,8 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: + from .data import _setup_persistent_brokerd + log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -152,6 +154,21 @@ async def spawn_brokerd( **tractor_kwargs ) + # TODO: so i think this is the perfect use case for supporting + # a cross-actor async context manager api instead of this + # shoort-and-forget task spawned in the root nursery, we'd have an + # async exit stack that we'd register the `portal.open_context()` + # call with and then have the ability to unwind the call whenevs. + + # non-blocking setup of brokerd service nursery + _services.service_n.start_soon( + partial( + portal.run, + _setup_persistent_brokerd, + brokername=brokername, + ) + ) + return dname From 7cc395b5bf6c7b915f28c1d83f6781e829f3c7e5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Mar 2021 08:35:58 -0400 Subject: [PATCH 08/37] Open data feed in ems main entrypoint --- piker/clearing/_client.py | 1 + piker/clearing/_ems.py | 158 ++++++++++++++++++++------------------ 2 files changed, 83 insertions(+), 76 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6accc0b8..121cd080 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -181,6 +181,7 @@ async def maybe_open_emsd( async with tractor.find_actor('pikerd') as portal: assert portal + name = await portal.run( spawn_emsd, brokername=brokername, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9e7b9045..ed9a2102 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -40,7 +40,11 @@ log = get_logger(__name__) # TODO: numba all of this -def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: +def mk_check( + trigger_price: float, + known_last: float, + action: str, +) -> Callable[[float, float], bool]: """Create a predicate for given ``exec_price`` based on last known price, ``known_last``. @@ -230,6 +234,7 @@ async def execute_triggers( async def exec_loop( ctx: tractor.Context, + feed: 'Feed', # noqa broker: str, symbol: str, _exec_mode: str, @@ -239,66 +244,61 @@ async def exec_loop( to brokers. """ - async with data.open_feed( - broker, - [symbol], - loglevel='info', - ) as feed: - # TODO: get initial price quote from target broker - first_quote = await feed.receive() + # TODO: get initial price quote from target broker + first_quote = await feed.receive() - book = get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + book = get_dark_book(broker) + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - # TODO: wrap this in a more re-usable general api - client_factory = getattr(feed.mod, 'get_client_proxy', None) + # TODO: wrap this in a more re-usable general api + client_factory = getattr(feed.mod, 'get_client_proxy', None) - if client_factory is not None and _exec_mode != 'paper': + if client_factory is not None and _exec_mode != 'paper': - # we have an order API for this broker - client = client_factory(feed._brokerd_portal) + # we have an order API for this broker + client = client_factory(feed._brokerd_portal) - else: - # force paper mode - log.warning(f'Entering paper trading mode for {broker}') + else: + # force paper mode + log.warning(f'Entering paper trading mode for {broker}') - client = PaperBoi( - broker, - *trio.open_memory_channel(100), - _buys={}, - _sells={}, + client = PaperBoi( + broker, + *trio.open_memory_channel(100), + _buys={}, + _sells={}, - _reqids={}, - ) + _reqids={}, + ) - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - feed._trade_stream = client.trade_stream + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + feed._trade_stream = client.trade_stream - # init the trades stream - client._to_trade_stream.send_nowait({'local_trades': 'start'}) + # init the trades stream + client._to_trade_stream.send_nowait({'local_trades': 'start'}) - _exec_mode = 'paper' + _exec_mode = 'paper' - # return control to parent task - task_status.started((first_quote, feed, client)) + # return control to parent task + task_status.started((first_quote, feed, client)) - stream = feed.stream - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) + stream = feed.stream + async with trio.open_nursery() as n: + n.start_soon( + execute_triggers, + broker, + symbol, + stream, + ctx, + client, + book + ) - if _exec_mode == 'paper': - n.start_soon(simulate_fills, stream.clone(), client) + if _exec_mode == 'paper': + n.start_soon(simulate_fills, stream.clone(), client) # TODO: lots of cases still to handle @@ -556,7 +556,7 @@ async def process_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - pred = mk_check(trigger_price, last) + pred = mk_check(trigger_price, last, action) tick_slap: float = 5 min_tick = feed.symbols[sym].tick_size @@ -646,35 +646,41 @@ async def _emsd_main( async with trio.open_nursery() as n: # TODO: eventually support N-brokers - - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, + async with data.open_feed( broker, - symbol, - _mode, - ) + [symbol], + loglevel='info', + ) as feed: - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) - # connect back to the calling actor (the one that is - # acting as an EMS client and will submit orders) to - # receive requests pushed over a tractor stream - # using (for now) an async generator. - order_stream = await portal.run(send_order_cmds) + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # connect back to the calling actor (the one that is + # acting as an EMS client and will submit orders) to + # receive requests pushed over a tractor stream + # using (for now) an async generator. + order_stream = await portal.run(send_order_cmds) + + # start inbound order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + ) From 8ccf987d521d01cd07a835f69f47b17d846c46ad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Mar 2021 08:57:22 -0400 Subject: [PATCH 09/37] Fix typo --- piker/data/_sharedmem.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 3dc8de89..4a52cfb3 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -16,6 +16,7 @@ """ NumPy compatible shared memory buffers for real-time FSP. + """ from dataclasses import dataclass, asdict from sys import byteorder @@ -426,7 +427,7 @@ def maybe_open_shm_array( **kwargs, ) -> Tuple[ShmArray, bool]: """Attempt to attach to a shared memory block using a "key" lookup - to registered blocks in the users overall "system" registryt + to registered blocks in the users overall "system" registry (presumes you don't have the block's explicit token). This function is meant to solve the problem of discovering whether From 5fc2aba3ed280b02ad9bfec8ad22d2515543b5ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 30 Mar 2021 10:55:36 -0400 Subject: [PATCH 10/37] Drop some more old cruft --- piker/brokers/ib.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 649acca9..2e475114 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -777,12 +777,8 @@ def normalize( return data -# _local_buffer_writers = {} - - async def backfill_bars( sym: str, - # first_bars: list, shm: ShmArray, # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer From 4f51ca74f4bb31e77b9c6cca164bccfbd2ce6ddc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 30 Mar 2021 10:56:19 -0400 Subject: [PATCH 11/37] Broadcast all tick types to subs, not just trades --- piker/data/__init__.py | 67 ++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 6da478d3..fd98452f 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -206,7 +206,6 @@ async def allocate_persistent_feed( bus.feeds[symbol] = (cs, init_msg, first_quote) with cs: - if opened: # start history backfill task # ``backfill_bars()`` is a required backend func @@ -246,43 +245,47 @@ async def allocate_persistent_feed( # while another consumer is serviced.. # start writing the shm buffer with appropriate trade data - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] + for tick in quote['ticks']: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): - new_v = tick.get('size', 0) + last = tick['price'] - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - if sum_tick_vlm: - volume = v + new_v - else: - volume = v + new_v = tick.get('size', 0) - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - volume, - ) + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - for ctx in bus.subscribers[sym]: - await ctx.send_yield({sym: quote}) + if sum_tick_vlm: + volume = v + new_v + else: + volume = v + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + volume, + ) + + for ctx in bus.subscribers[sym]: + await ctx.send_yield({sym: quote}) @tractor.stream From 3ebb7ab6b16e6a48e370a50587ebdcab999ce383 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 30 Mar 2021 11:02:20 -0400 Subject: [PATCH 12/37] Only activate Qt hidpi detection on windows for now --- piker/ui/_exec.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 2146cdd5..d621e98c 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -22,6 +22,7 @@ All global Qt runtime settings are mostly defined here. """ from typing import Tuple, Callable, Dict, Any import os +import platform import signal import time import traceback @@ -87,16 +88,19 @@ def current_screen() -> QtGui.QScreen: assert screen, "Wow Qt is dumb as shit and has no screen..." return screen -# XXX: pretty sure none of this shit works + +# XXX: pretty sure none of this shit works on linux as per: # https://bugreports.qt.io/browse/QTBUG-53022 +# it seems to work on windows.. no idea wtf is up. +if platform.system() == "Windows": -# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute -# must be set before creating the application -# if hasattr(Qt, 'AA_EnableHighDpiScaling'): -# QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) + # Proper high DPI scaling is available in Qt >= 5.6.0. This attibute + # must be set before creating the application + if hasattr(Qt, 'AA_EnableHighDpiScaling'): + QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) -# if hasattr(Qt, 'AA_UseHighDpiPixmaps'): -# QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) + if hasattr(Qt, 'AA_UseHighDpiPixmaps'): + QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) class MainWindow(QtGui.QMainWindow): From b2eacb85d4a0861b41104d41e135a2813e1834af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:04:59 -0400 Subject: [PATCH 13/37] Don't hold lock on initial client construction --- piker/brokers/api.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/piker/brokers/api.py b/piker/brokers/api.py index 75fc8a14..c1e11a01 100644 --- a/piker/brokers/api.py +++ b/piker/brokers/api.py @@ -22,7 +22,6 @@ from typing import Dict from contextlib import asynccontextmanager, AsyncExitStack import trio -import tractor from . import get_brokermod from ..log import get_logger @@ -30,10 +29,12 @@ from ..log import get_logger log = get_logger(__name__) -_cache: Dict[str, 'Client'] = {} + +_cache: Dict[str, 'Client'] = {} # noqa + @asynccontextmanager -async def get_cached_client( +async def open_cached_client( brokername: str, *args, **kwargs, @@ -74,10 +75,11 @@ async def get_cached_client( client._exit_stack = exit_stack clients[brokername] = client - yield client + yield client finally: - client._consumers -= 1 - if client._consumers <= 0: - # teardown the client - await client._exit_stack.aclose() + if client is not None: + # if no more consumers, teardown the client + client._consumers -= 1 + if client._consumers <= 0: + await client._exit_stack.aclose() From 1d013126b9224147714f2fe4a313c47d85a52778 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:05:16 -0400 Subject: [PATCH 14/37] Fix type annot --- piker/brokers/ib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2e475114..c7d8917e 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -798,7 +798,6 @@ async def backfill_bars( # write historical data to buffer shm.push(bars_array) - # shm_token = shm.token with trio.CancelScope() as cs: @@ -939,6 +938,7 @@ async def start_aio_quote_stream( async def stream_quotes( + send_chan: trio.abc.SendChannel, symbols: List[str], shm: ShmArray, @@ -946,7 +946,7 @@ async def stream_quotes( loglevel: str = None, # startup sync - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, ) -> None: """Stream symbol quotes. From 30dabbab443e4a8940dc29e28bfb42116acffab9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:19:48 -0400 Subject: [PATCH 15/37] Support backend volume summing; handle disconnects --- piker/data/__init__.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index fd98452f..afc74d75 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -20,6 +20,7 @@ Data feed apis and infra. We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. + """ from dataclasses import dataclass, field from contextlib import asynccontextmanager @@ -227,7 +228,9 @@ async def allocate_persistent_feed( # begin shm write loop and broadcast to subscribers - sum_tick_vlm: bool = True + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) async with quote_stream: @@ -247,6 +250,9 @@ async def allocate_persistent_feed( # start writing the shm buffer with appropriate trade data for tick in quote['ticks']: + # if tick['type'] in ('utrade',): + # print(tick) + # write trade events to shm last OHLC sample if tick['type'] in ('trade', 'utrade'): @@ -268,24 +274,34 @@ async def allocate_persistent_feed( if sum_tick_vlm: volume = v + new_v else: - volume = v + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] shm.array[[ 'open', 'high', 'low', 'close', + 'bar_wap', # can be optionally provided 'volume', ]][-1] = ( o, max(high, last), min(low, last), last, + quote.get('bar_wap', 0), volume, ) for ctx in bus.subscribers[sym]: - await ctx.send_yield({sym: quote}) + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') @tractor.stream From 24bda8636e428587c126842dcfc19f9ee429ec0c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:20:12 -0400 Subject: [PATCH 16/37] Port quest to new client api --- piker/brokers/questrade.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index e54c75a2..528df91e 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1180,6 +1180,11 @@ def normalize( return new +# TODO: currently this backend uses entirely different +# data feed machinery that was written earlier then the +# existing stuff used in other backends. This needs to +# be ported eventually and should *just work* despite +# being a multi-symbol, poll-style feed system. @tractor.stream async def stream_quotes( ctx: tractor.Context, # marks this as a streaming func @@ -1192,7 +1197,7 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel) - async with api.get_cached_client('questrade') as client: + async with api.open_cached_client('questrade') as client: if feed_type == 'stock': formatter = format_stock_quote get_quotes = await stock_quoter(client, symbols) From 29b73b41fb8637afecb5ab88b434a4f506a1285d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:20:37 -0400 Subject: [PATCH 17/37] Slight rename; fix predicate is None bug --- piker/clearing/_ems.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ed9a2102..5880dd50 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -72,7 +72,7 @@ def mk_check( return check_lt else: - return None, None + return None @dataclass @@ -511,7 +511,6 @@ async def process_order_cmds( exec_mode = cmd['exec_mode'] broker = brokers[0] - last = dark_book.lasts[(broker, sym)] if exec_mode == 'live' and action in ('buy', 'sell',): @@ -556,9 +555,10 @@ async def process_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. + last = dark_book.lasts[(broker, sym)] pred = mk_check(trigger_price, last, action) - tick_slap: float = 5 + spread_slap: float = 5 min_tick = feed.symbols[sym].tick_size if action == 'buy': @@ -568,12 +568,12 @@ async def process_order_cmds( # TODO: we probably need to scale this based # on some near term historical spread # measure? - abs_diff_away = tick_slap * min_tick + abs_diff_away = spread_slap * min_tick elif action == 'sell': tickfilter = ('bid', 'last', 'trade') percent_away = -0.005 - abs_diff_away = -tick_slap * min_tick + abs_diff_away = -spread_slap * min_tick else: # alert tickfilter = ('trade', 'utrade', 'last') From 5a970dad72b310070c7663ecec333e4865fd7af0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:22:09 -0400 Subject: [PATCH 18/37] Port kraken backend to new data feed api --- piker/brokers/kraken.py | 133 +++++++++++++++------------------------- 1 file changed, 50 insertions(+), 83 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 5d8763c3..ac80988b 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -16,15 +16,17 @@ """ Kraken backend. + """ from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple import json import time import trio_websocket +from trio_typing import TaskStatus from trio_websocket._impl import ( ConnectionClosed, DisconnectionTimeout, @@ -41,15 +43,11 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel + +from .api import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log -from ..data import ( - _buffer, - # iterticks, - attach_shm_array, - get_shm_token, - subscribe_ohlc_for_increment, -) +from ..data import ShmArray log = get_logger(__name__) @@ -315,6 +313,7 @@ def normalize( quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['last'] = quote['close'] + quote['bar_wap'] = ohlc.vwap # seriously eh? what's with this non-symmetry everywhere # in subscription systems... @@ -426,17 +425,37 @@ async def open_autorecon_ws(url): await stack.aclose() -# @tractor.msg.pub +async def backfill_bars( + sym: str, + shm: ShmArray, # type: ignore # noqa + + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + """Fill historical bars into shared mem / storage afap. + """ + with trio.CancelScope() as cs: + async with open_cached_client('kraken') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + async def stream_quotes( - # get_topics: Callable, - shm_token: Tuple[str, str, List[tuple]], - symbols: List[str] = ['XBTUSD', 'XMRUSD'], - # These are the symbols not expected by the ws api - # they are looked up inside this routine. - sub_type: str = 'ohlc', + + send_chan: trio.abc.SendChannel, + symbols: List[str], + shm: ShmArray, + feed_is_live: trio.Event, loglevel: str = None, - # compat with eventual ``tractor.msg.pub`` - topics: Optional[List[str]] = None, + + # backend specific + sub_type: str = 'ohlc', + + # startup sync + task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. @@ -447,7 +466,8 @@ async def stream_quotes( ws_pairs = {} sym_infos = {} - async with get_client() as client: + + async with open_cached_client('kraken') as client: # keep client cached for real-time section for sym in symbols: @@ -458,40 +478,16 @@ async def stream_quotes( sym_infos[sym] = syminfo ws_pairs[sym] = si.wsname - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - writer_exists = get_shm_token(shm_token['shm_name']) - symbol = symbols[0] - if not writer_exists: - shm = attach_shm_array( - token=shm_token, - # we are writer - readonly=False, - ) - bars = await client.bars(symbol=symbol) - - shm.push(bars) - shm_token = shm.token - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) - - # yield shm_token, not writer_exists init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written symbol: { - 'is_shm_writer': not writer_exists, - 'shm_token': shm_token, 'symbol_info': sym_infos[sym], - } - # for sym in symbols + 'shm_write_opts': {'sum_tick_vml': False}, + }, } - yield init_msgs async with open_autorecon_ws('wss://ws.kraken.com/') as ws: @@ -521,15 +517,16 @@ async def stream_quotes( # pull a first quote and deliver msg_gen = stream_messages(ws) + # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await msg_gen.__anext__() topic, quote = normalize(ohlc_last) - # packetize as {topic: quote} - yield {topic: quote} + first_quote = {topic: quote} + task_status.started((init_msgs, first_quote)) - # tell incrementer task it can start - _buffer.shm_incrementing(shm_token['shm_name']).set() + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -546,15 +543,18 @@ async def stream_quotes( # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m volume = ohlc.volume - # new interval + # new OHLC sample interval if ohlc.etime > last_interval_start: last_interval_start = ohlc.etime tick_volume = volume + else: # this is the tick volume *within the interval* tick_volume = volume - ohlc_last.volume + ohlc_last = ohlc last = ohlc.close + if tick_volume: ohlc.ticks.append({ 'type': 'trade', @@ -564,43 +564,10 @@ async def stream_quotes( topic, quote = normalize(ohlc) - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - new_v = tick_volume - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - # write shm - shm.array[ - ['open', - 'high', - 'low', - 'close', - 'bar_wap', # in this case vwap of bar - 'volume'] - ][-1] = ( - o, - max(high, last), - min(low, last), - last, - ohlc.vwap, - volume, - ) - ohlc_last = ohlc - elif typ == 'l1': quote = ohlc topic = quote['symbol'] # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` - yield {topic: quote} + await send_chan.send({topic: quote}) From a8a3f098cf51dc4b6cd39c4b3d5c0f4a9ccc6b49 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Apr 2021 09:28:10 -0400 Subject: [PATCH 19/37] Drop lingering prints --- piker/ui/_graphics/_cursor.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/piker/ui/_graphics/_cursor.py b/piker/ui/_graphics/_cursor.py index e13ab20b..159c773e 100644 --- a/piker/ui/_graphics/_cursor.py +++ b/piker/ui/_graphics/_cursor.py @@ -90,16 +90,11 @@ class LineDot(pg.CurvePoint): self, ev: QtCore.QEvent, ) -> None: - # print((ev, type(ev))) if not isinstance( ev, QtCore.QDynamicPropertyChangeEvent ) or self.curve() is None: return False - # if ev.propertyName() == 'index': - # print(ev) - # # self.setProperty - (x, y) = self.curve().getData() index = self.property('index') # first = self._plot._ohlc[0]['index'] @@ -172,8 +167,6 @@ class ContentsLabel(pg.LabelItem): if inspect.isfunction(margins[1]): margins = margins[0], ydim(anchor_font_size) - print(f'margins: {margins}') - self.anchor(itemPos=index, parentPos=index, offset=margins) def update_from_ohlc( @@ -403,7 +396,6 @@ class Cursor(pg.GraphicsObject): # update all trackers for item in self._trackers: - # print(f'setting {item} with {(ix, y)}') item.on_tracked_source(ix, iy) if ix != last_ix: From c05fc8991aca4348a49509d1d907fd73592142e4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Apr 2021 01:18:51 -0400 Subject: [PATCH 20/37] Rework ohlc sampling to launch from .start() Avoid bothering with a trio event and expect the caller to do manual shm registering with the write loop. Provide OHLC sample period indexing through a re-branded pub-sub func ``iter_ohlc_periods()``. --- piker/data/_buffer.py | 122 +++++++++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 896b503b..eccf4ca6 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -17,17 +17,25 @@ """ Data buffers for fast shared humpy. """ -from typing import Tuple, Callable, Dict -# import time +from typing import Dict, List import tractor import trio +from trio_typing import TaskStatus from ._sharedmem import ShmArray +from ..log import get_logger -_shms: Dict[int, ShmArray] = {} +log = get_logger(__name__) + + +# TODO: we could stick these in a composed type to avoid +# angering the "i hate module scoped variables crowd" (yawn). +_shms: Dict[int, List[ShmArray]] = {} _start_increment: Dict[str, trio.Event] = {} +_incrementers: Dict[int, trio.CancelScope] = {} +_subscribers: Dict[str, tractor.Context] = {} def shm_incrementing(shm_token_name: str) -> trio.Event: @@ -35,11 +43,9 @@ def shm_incrementing(shm_token_name: str) -> trio.Event: return _start_increment.setdefault(shm_token_name, trio.Event()) -@tractor.msg.pub async def increment_ohlc_buffer( - shm_token: dict, - get_topics: Callable[..., Tuple[str]], - # delay_s: Optional[float] = None, + delay_s: int, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ): """Task which inserts new bars into the provide shared memory array every ``delay_s`` seconds. @@ -54,14 +60,16 @@ async def increment_ohlc_buffer( the underlying buffers will actually be incremented. """ - # wait for brokerd to signal we should start sampling - await shm_incrementing(shm_token['shm_name']).wait() + # # wait for brokerd to signal we should start sampling + # await shm_incrementing(shm_token['shm_name']).wait() # TODO: right now we'll spin printing bars if the last time stamp is # before a large period of no market activity. Likely the best way # to solve this is to make this task aware of the instrument's # tradable hours? + global _incrementers + # adjust delay to compensate for trio processing time ad = min(_shms.keys()) - 0.001 @@ -69,47 +77,75 @@ async def increment_ohlc_buffer( lowest = min(_shms.keys()) ad = lowest - 0.001 - while True: - # TODO: do we want to support dynamically - # adding a "lower" lowest increment period? - await trio.sleep(ad) - total_s += lowest + with trio.CancelScope() as cs: - # increment all subscribed shm arrays - # TODO: this in ``numba`` - for delay_s, shms in _shms.items(): - if total_s % delay_s != 0: - continue + # register this time period step as active + _incrementers[delay_s] = cs + task_status.started(cs) - # TODO: numa this! - for shm in shms: - # TODO: in theory we could make this faster by copying the - # "last" readable value into the underlying larger buffer's - # next value and then incrementing the counter instead of - # using ``.push()``? + while True: + # TODO: do we want to support dynamically + # adding a "lower" lowest increment period? + await trio.sleep(ad) + total_s += lowest - # append new entry to buffer thus "incrementing" the bar - array = shm.array - last = array[-1:][shm._write_fields].copy() - # (index, t, close) = last[0][['index', 'time', 'close']] - (t, close) = last[0][['time', 'close']] + # increment all subscribed shm arrays + # TODO: this in ``numba`` + for delay_s, shms in _shms.items(): + if total_s % delay_s != 0: + continue - # this copies non-std fields (eg. vwap) from the last datum - last[ - ['time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (t + delay_s, 0, close, close, close, close) + # TODO: numa this! + for shm in shms: + # TODO: in theory we could make this faster by copying the + # "last" readable value into the underlying larger buffer's + # next value and then incrementing the counter instead of + # using ``.push()``? - # write to the buffer - shm.push(last) + # append new entry to buffer thus "incrementing" the bar + array = shm.array + last = array[-1:][shm._write_fields].copy() + # (index, t, close) = last[0][['index', 'time', 'close']] + (t, close) = last[0][['time', 'close']] - # broadcast the buffer index step - yield {'index': shm._last.value} + # this copies non-std fields (eg. vwap) from the last datum + last[ + ['time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (t + delay_s, 0, close, close, close, close) + + # write to the buffer + shm.push(last) + + # broadcast the buffer index step + # yield {'index': shm._last.value} + for ctx in _subscribers.get(delay_s, ()): + try: + await ctx.send_yield({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') -def subscribe_ohlc_for_increment( - shm: ShmArray, - delay: int, +@tractor.stream +async def iter_ohlc_periods( + ctx: tractor.Context, + delay_s: int, ) -> None: - """Add an OHLC ``ShmArray`` to the increment set. """ - _shms.setdefault(delay, []).append(shm) + Subscribe to OHLC sampling "step" events: when the time + aggregation period increments, this event stream emits an index + event. + + """ + # add our subscription + global _subscribers + subs = _subscribers.setdefault(delay_s, []) + subs.append(ctx) + + try: + # stream and block until cancelled + await trio.sleep_forever() + finally: + subs.remove(ctx) From ce4144aace99ec057cd3eee33935e649cabee93c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Apr 2021 01:23:18 -0400 Subject: [PATCH 21/37] Deliver and utilise broker backend OHLC sample rate in init msg --- piker/data/__init__.py | 74 ++++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index afc74d75..907dd6fe 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -53,9 +53,10 @@ from ._sharedmem import ( ) from ._source import base_iohlc_dtype, Symbol from ._buffer import ( + _shms, + _incrementers, increment_ohlc_buffer, - subscribe_ohlc_for_increment, - shm_incrementing, + iter_ohlc_periods, ) __all__ = [ @@ -64,7 +65,7 @@ __all__ = [ 'attach_shm_array', 'open_shm_array', 'get_shm_token', - 'subscribe_ohlc_for_increment', + # 'subscribe_ohlc_for_increment', ] @@ -181,10 +182,10 @@ async def allocate_persistent_feed( readonly=False, ) - # assert opened - if not opened: - # do history validation? - pass + # do history validation? + assert opened, "Persistent shm for sym was already open?!" + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") send, quote_stream = trio.open_memory_channel(2**8) feed_is_live = trio.Event() @@ -204,34 +205,48 @@ async def allocate_persistent_feed( init_msg[symbol]['shm_token'] = shm.token cs = trio.CancelScope() + + # TODO: make this into a composed type which also + # contains the backfiller cs for individual super-based + # resspawns when needed. bus.feeds[symbol] = (cs, init_msg, first_quote) with cs: if opened: - # start history backfill task - # ``backfill_bars()`` is a required backend func - await bus.nursery.start(mod.backfill_bars, symbol, shm) - # yield back control to starting nursery - task_status.started((init_msg, first_quote)) + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + await bus.nursery.start(mod.backfill_bars, symbol, shm) times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] + # pass OHLC sample rate in seconds + init_msg[symbol]['sample_rate'] = delay_s + + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) + await feed_is_live.wait() - # tell incrementer task it can start - shm_incrementing(shm.token['shm_name']).set() + # # tell incrementer task it can start + # shm_incrementing(shm.token['shm_name']).set() # start shm incrementingn for OHLC sampling - subscribe_ohlc_for_increment(shm, delay_s) + # subscribe_ohlc_for_increment(shm, delay_s) - # begin shm write loop and broadcast to subscribers + if opened: + _shms.setdefault(delay_s, []).append(shm) + + if _incrementers.get(delay_s) is None: + cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) + # begin shm write loop and broadcast to subscribers async with quote_stream: log.info("Started shared mem bar writer") @@ -372,6 +387,7 @@ class Feed: _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _max_sample_rate: int = 0 # cache of symbol info messages received as first message when # a stream startsc. @@ -380,15 +396,19 @@ class Feed: async def receive(self) -> dict: return await self.stream.__anext__() - async def index_stream(self) -> AsyncIterator[int]: + async def index_stream( + self, + delay_s: Optional[int] = None + + ) -> AsyncIterator[int]: + if not self._index_stream: # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes self._index_stream = await self._brokerd_portal.run( - increment_ohlc_buffer, - shm_token=self.shm.token, - topics=['index'], + iter_ohlc_periods, + delay_s=delay_s or self._max_sample_rate, ) return self._index_stream @@ -459,9 +479,9 @@ async def open_feed( loglevel=loglevel, ) - # TODO: we can't do this **and** be compate with - # ``tractor.msg.pub``, should we maybe just drop this after - # tests are in? + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 init_msg = await stream.receive() shm = attach_shm_array( @@ -478,10 +498,12 @@ async def open_feed( mod=mod, _brokerd_portal=portal, ) + ohlc_sample_rates = [] for sym, data in init_msg.items(): si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) symbol = Symbol( key=sym, @@ -493,9 +515,11 @@ async def open_feed( feed.symbols[sym] = symbol + # cast shm dtype to list... can't member why we need this shm_token = data['shm_token'] + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity - shm_token['dtype_descr'] = list(shm_token['dtype_descr']) - assert shm_token == shm.token # sanity + feed._max_sample_rate = max(ohlc_sample_rates) yield feed From 8069bbe105663cac3ba6d635c23903868e044565 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Apr 2021 12:21:50 -0400 Subject: [PATCH 22/37] Drop old incrementer func --- piker/fsp/__init__.py | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index c798bf28..0b432e5b 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -28,7 +28,7 @@ from ..log import get_logger from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap -from ..data import attach_shm_array, Feed +from ..data import attach_shm_array log = get_logger(__name__) @@ -62,23 +62,6 @@ async def latency( yield value -async def increment_signals( - feed: Feed, - dst_shm: 'SharedArray', # noqa -) -> None: - """Increment the underlying shared memory buffer on every "increment" - msg received from the underlying data feed. - - """ - async for msg in await feed.index_stream(): - array = dst_shm.array - last = array[-1:].copy() - - # write new slot to the buffer - dst_shm.push(last) - len(dst_shm.array) - - @tractor.stream async def cascade( ctx: tractor.Context, @@ -150,7 +133,6 @@ async def cascade( ) history[fsp_func_name] = history_output - # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff >= 0: @@ -182,8 +164,8 @@ async def cascade( cs = await n.start(fsp_compute) - # Increment the underlying shared memory buffer on every "increment" - # msg received from the underlying data feed. + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. async for msg in await feed.index_stream(): @@ -198,10 +180,11 @@ async def cascade( # TODO: adopt an incremental update engine/approach # where possible here eventually! + # read out last shm row array = dst.array last = array[-1:].copy() - # write new slot to the buffer + # write new row to the shm buffer dst.push(last) last_len = new_len From 100e27ac1206c93598fd7071a9de048c3c333524 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 07:58:28 -0400 Subject: [PATCH 23/37] Task lock bus loading, always close feed stream on disconnect --- piker/data/__init__.py | 238 ++++++++++++++++++++++------------------- 1 file changed, 127 insertions(+), 111 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 907dd6fe..4629bbc4 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -96,6 +96,7 @@ class _FeedsBus(BaseModel): nursery: trio.Nursery feeds: Dict[str, trio.CancelScope] = {} subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() class Config: arbitrary_types_allowed = True @@ -115,7 +116,7 @@ def get_feed_bus( ) -> _FeedsBus: """ Retreive broker-daemon-local data feeds bus from process global - scope. + scope. Serialize task access to lock. """ @@ -152,6 +153,7 @@ async def _setup_persistent_brokerd(brokername: str) -> None: # parent actor decides to tear it down await trio.sleep_forever() finally: + # TODO: this needs to be shielded? await bus.cancel_all() @@ -187,7 +189,7 @@ async def allocate_persistent_feed( # if not opened: # raise RuntimeError("Persistent shm for sym was already open?!") - send, quote_stream = trio.open_memory_channel(2**8) + send, quote_stream = trio.open_memory_channel(10) feed_is_live = trio.Event() # establish broker backend quote stream @@ -204,119 +206,120 @@ async def allocate_persistent_feed( ) init_msg[symbol]['shm_token'] = shm.token - cs = trio.CancelScope() + cs = bus.nursery.cancel_scope # TODO: make this into a composed type which also # contains the backfiller cs for individual super-based # resspawns when needed. bus.feeds[symbol] = (cs, init_msg, first_quote) - with cs: - if opened: + if opened: - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start(mod.backfill_bars, symbol, shm) + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + await bus.nursery.start(mod.backfill_bars, symbol, shm) - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] - # pass OHLC sample rate in seconds - init_msg[symbol]['sample_rate'] = delay_s + # pass OHLC sample rate in seconds + init_msg[symbol]['sample_rate'] = delay_s - # yield back control to starting nursery - task_status.started((init_msg, first_quote)) + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) - await feed_is_live.wait() + await feed_is_live.wait() - # # tell incrementer task it can start - # shm_incrementing(shm.token['shm_name']).set() + if opened: + _shms.setdefault(delay_s, []).append(shm) - # start shm incrementingn for OHLC sampling - # subscribe_ohlc_for_increment(shm, delay_s) + # start shm incrementing for OHLC sampling + if _incrementers.get(delay_s) is None: + cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) - if opened: - _shms.setdefault(delay_s, []).append(shm) + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) - if _incrementers.get(delay_s) is None: - cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) + log.info("Started shared mem bar writer") - sum_tick_vlm: bool = init_msg.get( - 'shm_write_opts', {} - ).get('sum_tick_vlm', True) + # iterate stream delivered by broker + async for quotes in quote_stream: + for sym, quote in quotes.items(): - # begin shm write loop and broadcast to subscribers - async with quote_stream: + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. - log.info("Started shared mem bar writer") + # start writing the shm buffer with appropriate + # trade data + for tick in quote['ticks']: - # iterate stream delivered by broker - async for quotes in quote_stream: - for sym, quote in quotes.items(): + # if tick['type'] in ('utrade',): + # print(tick) - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): - # start writing the shm buffer with appropriate trade data - for tick in quote['ticks']: + last = tick['price'] - # if tick['type'] in ('utrade',): - # print(tick) + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - # write trade events to shm last OHLC sample - if tick['type'] in ('trade', 'utrade'): + new_v = tick.get('size', 0) - last = tick['price'] + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + if sum_tick_vlm: + volume = v + new_v + else: + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] - new_v = tick.get('size', 0) + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'bar_wap', # can be optionally provided + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + quote.get('bar_wap', 0), + volume, + ) - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - if sum_tick_vlm: - volume = v + new_v - else: - # presume backend takes care of summing - # it's own vlm - volume = quote['volume'] - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'bar_wap', # can be optionally provided - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - quote.get('bar_wap', 0), - volume, - ) - - for ctx in bus.subscribers[sym]: - try: - await ctx.send_yield({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error(f'{ctx.chan.uid} dropped connection') + # XXX: we need to be very cautious here that no + # context-channel is left lingering which doesn't have + # a far end receiver actor-task. In such a case you can + # end up triggering backpressure which which will + # eventually block this producer end of the feed and + # thus other consumers still attached. + subs = bus.subscribers[sym] + for ctx in subs: + # print(f'sub is {ctx.chan.uid}') + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + subs.remove(ctx) + log.error(f'{ctx.chan.uid} dropped connection') @tractor.stream @@ -327,6 +330,7 @@ async def attach_feed_bus( loglevel: str, ): + # try: if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -337,35 +341,42 @@ async def attach_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - task_cs = bus.feeds.get(symbol) - bus.subscribers.setdefault(symbol, []).append(ctx) - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - if task_cs is None: - init_msg, first_quote = await bus.nursery.start( - partial( - allocate_persistent_feed, - ctx=ctx, - bus=bus, - brokername=brokername, - symbol=symbol, - loglevel=loglevel, + async with bus.task_lock: + task_cs = bus.feeds.get(symbol) + sub_only: bool = False + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + if task_cs is None: + init_msg, first_quote = await bus.nursery.start( + partial( + allocate_persistent_feed, + ctx=ctx, + bus=bus, + brokername=brokername, + symbol=symbol, + loglevel=loglevel, + ) ) - ) + bus.subscribers.setdefault(symbol, []).append(ctx) + else: + sub_only = True - # XXX: ``first_quote`` may be outdated here if this is secondary subscriber + # XXX: ``first_quote`` may be outdated here if this is secondary + # subscriber cs, init_msg, first_quote = bus.feeds[symbol] # send this even to subscribers to existing feed? await ctx.send_yield(init_msg) await ctx.send_yield(first_quote) - try: - # just block while the stream pumps - await trio.sleep_forever() + if sub_only: + bus.subscribers[symbol].append(ctx) + try: + await trio.sleep_forever() finally: bus.subscribers[symbol].remove(ctx) @@ -484,11 +495,10 @@ async def open_feed( # https://github.com/goodboy/tractor/issues/53 init_msg = await stream.receive() + # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], - - # we are the buffer writer - readonly=False, + readonly=True, ) feed = Feed( @@ -522,4 +532,10 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - yield feed + try: + yield feed + + finally: + # always cancel the far end producer task + with trio.CancelScope(shield=True): + await stream.aclose() From 28a961ebc092b3e60e8e1b72ea15ee05036fb333 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:01:26 -0400 Subject: [PATCH 24/37] Add a maybe-startup-runtime manager --- piker/_daemon.py | 45 +++++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 80175ae4..5812152c 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -18,6 +18,7 @@ Structured, daemon tree service management. """ +from functools import partial from typing import Optional, Union from contextlib import asynccontextmanager @@ -87,6 +88,18 @@ async def open_pikerd( yield _services +@asynccontextmanager +async def maybe_open_runtime( + loglevel: Optional[str] = None, + **kwargs, +) -> None: + if not tractor.current_actor(err_on_no_runtime=False): + async with tractor.open_root_actor(loglevel=loglevel, **kwargs): + yield + else: + yield + + @asynccontextmanager async def maybe_open_pikerd( loglevel: Optional[str] = None, @@ -100,23 +113,23 @@ async def maybe_open_pikerd( if loglevel: get_console_log(loglevel) - try: + # subtle, we must have the runtime up here or portal lookup will fail + async with maybe_open_runtime(loglevel, **kwargs): async with tractor.find_actor(_root_dname) as portal: - assert portal is not None - yield portal - return + # assert portal is not None + if portal is not None: + yield portal + return - except (RuntimeError, AssertionError): # tractor runtime not started yet - - # presume pikerd role - async with open_pikerd( - loglevel, - **kwargs, - ) as _: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - yield None + # presume pikerd role + async with open_pikerd( + loglevel, + **kwargs, + ) as _: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + yield None # brokerd enabled modules @@ -147,7 +160,7 @@ async def spawn_brokerd( global _services assert _services - await _services.actor_n.start_actor( + portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, From ddae08493b758a7ef82f1f93ecc945c70159feb8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:02:56 -0400 Subject: [PATCH 25/37] Manage the send mem chan lifetime --- piker/brokers/kraken.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index ac80988b..3620f19a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -467,7 +467,7 @@ async def stream_quotes( ws_pairs = {} sym_infos = {} - async with open_cached_client('kraken') as client: + async with open_cached_client('kraken') as client, send_chan as send_chan: # keep client cached for real-time section for sym in symbols: From 1281755d65614c6299321376517406a144a76517 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:06:01 -0400 Subject: [PATCH 26/37] Don't use mem chan cloning, it doesn't actually broadcast --- piker/clearing/_ems.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5880dd50..0998aa44 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -298,7 +298,9 @@ async def exec_loop( ) if _exec_mode == 'paper': - n.start_soon(simulate_fills, stream.clone(), client) + # TODO: make this an actual broadcast channels as in: + # https://github.com/python-trio/trio/issues/987 + n.start_soon(simulate_fills, stream, client) # TODO: lots of cases still to handle From 47f823ab2285ac68f93fcc04ed1e9ead7c817b70 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:06:28 -0400 Subject: [PATCH 27/37] Always close emsd data connection on close --- piker/clearing/_client.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 121cd080..598f4f54 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -191,7 +191,6 @@ async def maybe_open_emsd( yield portal - @asynccontextmanager async def open_ems( broker: str, @@ -248,4 +247,13 @@ async def open_ems( with trio.fail_after(10): await book._ready_to_receive.wait() - yield book, trades_stream + try: + yield book, trades_stream + + finally: + # TODO: we want to eventually keep this up (by having + # the exec loop keep running in the pikerd tree) but for + # now we have to kill the context to avoid backpressure + # build-up on the shm write loop. + with trio.CancelScope(shield=True): + await trades_stream.aclose() From 20c99733b6f18bdb1f16de0b45198b0b86d0e469 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:06:59 -0400 Subject: [PATCH 28/37] Let cli pass through actor name --- piker/ui/_exec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index d621e98c..361dec33 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -200,7 +200,6 @@ def run_qtractor( async def main(): async with maybe_open_pikerd( - name='qtractor', start_method='trio', **tractor_kwargs, ): From a18a4b5a4cc017e366700182ee7c36910315e584 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:07:14 -0400 Subject: [PATCH 29/37] Call the app what it is --- piker/ui/cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index a407afe5..93e1a9fd 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -148,6 +148,7 @@ def chart(config, symbol, profile): tractor_kwargs={ 'debug_mode': True, 'loglevel': tractorloglevel, + 'name': 'chart', 'enable_modules': [ 'piker.clearing._client' ], From 3506bbe05c307a03bb47ab640d8fe2cd4a996307 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:09:26 -0400 Subject: [PATCH 30/37] Nest async withs --- piker/ui/order_mode.py | 148 ++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 75 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 27059894..8b1d1a14 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -321,97 +321,95 @@ async def start_order_mode( async with open_ems( brokername, symbol, - ) as (book, trades_stream): + ) as (book, trades_stream), open_order_mode( + symbol, + chart, + book, + ) as order_mode: - async with open_order_mode( - symbol, - chart, - book, - ) as order_mode: + def get_index(time: float): - def get_index(time: float): + # XXX: not sure why the time is so off here + # looks like we're gonna have to do some fixing.. - # XXX: not sure why the time is so off here - # looks like we're gonna have to do some fixing.. + ohlc = chart._shm.array + indexes = ohlc['time'] >= time - ohlc = chart._shm.array - indexes = ohlc['time'] >= time + if any(indexes): + return ohlc['index'][indexes[-1]] + else: + return ohlc['index'][-1] - if any(indexes): - return ohlc['index'][indexes[-1]] - else: - return ohlc['index'][-1] + # Begin order-response streaming - # Begin order-response streaming + # this is where we receive **back** messages + # about executions **from** the EMS actor + async for msg in trades_stream: - # this is where we receive **back** messages - # about executions **from** the EMS actor - async for msg in trades_stream: + fmsg = pformat(msg) + log.info(f'Received order msg:\n{fmsg}') - fmsg = pformat(msg) - log.info(f'Received order msg:\n{fmsg}') + resp = msg['resp'] - resp = msg['resp'] + if resp in ( + 'position', + ): + # show line label once order is live + order_mode.on_position_update(msg) + continue - if resp in ( - 'position', - ): - # show line label once order is live - order_mode.on_position_update(msg) - continue + # delete the line from view + oid = msg['oid'] - # delete the line from view - oid = msg['oid'] + # response to 'action' request (buy/sell) + if resp in ( + 'dark_submitted', + 'broker_submitted' + ): - # response to 'action' request (buy/sell) - if resp in ( - 'dark_submitted', - 'broker_submitted' - ): + # show line label once order is live + order_mode.on_submit(oid) - # show line label once order is live - order_mode.on_submit(oid) + # resp to 'cancel' request or error condition + # for action request + elif resp in ( + 'broker_cancelled', + 'broker_inactive', + 'dark_cancelled' + ): + # delete level line from view + order_mode.on_cancel(oid) - # resp to 'cancel' request or error condition - # for action request - elif resp in ( - 'broker_cancelled', - 'broker_inactive', - 'dark_cancelled' - ): - # delete level line from view - order_mode.on_cancel(oid) + elif resp in ( + 'dark_executed' + ): + log.info(f'Dark order triggered for {fmsg}') - elif resp in ( - 'dark_executed' - ): - log.info(f'Dark order triggered for {fmsg}') + # for alerts add a triangle and remove the + # level line + if msg['cmd']['action'] == 'alert': - # for alerts add a triangle and remove the - # level line - if msg['cmd']['action'] == 'alert': - - # should only be one "fill" for an alert - order_mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()) - ) - await order_mode.on_exec(oid, msg) - - # response to completed 'action' request for buy/sell - elif resp in ( - 'broker_executed', - ): - await order_mode.on_exec(oid, msg) - - # each clearing tick is responded individually - elif resp in ('broker_filled',): - action = msg['action'] - # TODO: some kinda progress system + # should only be one "fill" for an alert order_mode.on_fill( oid, - price=msg['price'], - arrow_index=get_index(msg['broker_time']), - pointing='up' if action == 'buy' else 'down', + price=msg['trigger_price'], + arrow_index=get_index(time.time()) ) + await order_mode.on_exec(oid, msg) + + # response to completed 'action' request for buy/sell + elif resp in ( + 'broker_executed', + ): + await order_mode.on_exec(oid, msg) + + # each clearing tick is responded individually + elif resp in ('broker_filled',): + action = msg['action'] + # TODO: some kinda progress system + order_mode.on_fill( + oid, + price=msg['price'], + arrow_index=get_index(msg['broker_time']), + pointing='up' if action == 'buy' else 'down', + ) From 152363797941e6b278140f19cd3a2ef36bfaea3f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 08:09:42 -0400 Subject: [PATCH 31/37] Always update L1 labels --- piker/ui/_l1.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/ui/_l1.py b/piker/ui/_l1.py index 683f46b9..0ff264bc 100644 --- a/piker/ui/_l1.py +++ b/piker/ui/_l1.py @@ -155,6 +155,8 @@ class LevelLabel(YAxisLabel): self._h_shift * (w + self._x_offset), abs_pos.y() + self._v_shift * h )) + # XXX: definitely need this! + self.update() def set_fmt_str( self, From 71d02db126323efb13f0110330b45719490a22d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 6 Apr 2021 14:08:50 -0400 Subject: [PATCH 32/37] Rename "buffer" to "sampling" --- piker/_daemon.py | 2 +- piker/data/__init__.py | 2 +- piker/data/{_buffer.py => _sampling.py} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename piker/data/{_buffer.py => _sampling.py} (100%) diff --git a/piker/_daemon.py b/piker/_daemon.py index 5812152c..e9c9b53b 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -137,7 +137,7 @@ _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.data', - 'piker.data._buffer' + 'piker.data._sampling' ] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 4629bbc4..772ef2bd 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -52,7 +52,7 @@ from ._sharedmem import ( get_shm_token, ) from ._source import base_iohlc_dtype, Symbol -from ._buffer import ( +from ._sampling import ( _shms, _incrementers, increment_ohlc_buffer, diff --git a/piker/data/_buffer.py b/piker/data/_sampling.py similarity index 100% rename from piker/data/_buffer.py rename to piker/data/_sampling.py From 3147a49384b5f717898e709a117f6bd593a805c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 6 Apr 2021 14:20:33 -0400 Subject: [PATCH 33/37] Move sample-broadcast routine into sampling module --- piker/data/__init__.py | 81 ++---------------------------------- piker/data/_sampling.py | 91 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 79 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 772ef2bd..c2b06d83 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -57,6 +57,7 @@ from ._sampling import ( _incrementers, increment_ohlc_buffer, iter_ohlc_periods, + sample_and_broadcast, ) __all__ = [ @@ -242,84 +243,8 @@ async def allocate_persistent_feed( 'shm_write_opts', {} ).get('sum_tick_vlm', True) - log.info("Started shared mem bar writer") - - # iterate stream delivered by broker - async for quotes in quote_stream: - for sym, quote in quotes.items(): - - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # start writing the shm buffer with appropriate - # trade data - for tick in quote['ticks']: - - # if tick['type'] in ('utrade',): - # print(tick) - - # write trade events to shm last OHLC sample - if tick['type'] in ('trade', 'utrade'): - - last = tick['price'] - - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - - new_v = tick.get('size', 0) - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - if sum_tick_vlm: - volume = v + new_v - else: - # presume backend takes care of summing - # it's own vlm - volume = quote['volume'] - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'bar_wap', # can be optionally provided - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - quote.get('bar_wap', 0), - volume, - ) - - # XXX: we need to be very cautious here that no - # context-channel is left lingering which doesn't have - # a far end receiver actor-task. In such a case you can - # end up triggering backpressure which which will - # eventually block this producer end of the feed and - # thus other consumers still attached. - subs = bus.subscribers[sym] - for ctx in subs: - # print(f'sub is {ctx.chan.uid}') - try: - await ctx.send_yield({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - subs.remove(ctx) - log.error(f'{ctx.chan.uid} dropped connection') + # start sample loop + await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) @tractor.stream diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index eccf4ca6..40951697 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -95,7 +95,7 @@ async def increment_ohlc_buffer( if total_s % delay_s != 0: continue - # TODO: numa this! + # TODO: ``numba`` this! for shm in shms: # TODO: in theory we could make this faster by copying the # "last" readable value into the underlying larger buffer's @@ -149,3 +149,92 @@ async def iter_ohlc_periods( await trio.sleep_forever() finally: subs.remove(ctx) + + +async def sample_and_broadcast( + bus: '_FeedBus', # noqa + shm: ShmArray, + quote_stream: trio.abc.ReceiveChannel, + sum_tick_vlm: bool = True, +) -> None: + + log.info("Started shared mem bar writer") + + # iterate stream delivered by broker + async for quotes in quote_stream: + + # TODO: ``numba`` this! + for sym, quote in quotes.items(): + + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # start writing the shm buffer with appropriate + # trade data + for tick in quote['ticks']: + + # if tick['type'] in ('utrade',): + # print(tick) + + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): + + last = tick['price'] + + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + + new_v = tick.get('size', 0) + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + if sum_tick_vlm: + volume = v + new_v + else: + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'bar_wap', # can be optionally provided + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + quote.get('bar_wap', 0), + volume, + ) + + # XXX: we need to be very cautious here that no + # context-channel is left lingering which doesn't have + # a far end receiver actor-task. In such a case you can + # end up triggering backpressure which which will + # eventually block this producer end of the feed and + # thus other consumers still attached. + subs = bus.subscribers[sym] + for ctx in subs: + # print(f'sub is {ctx.chan.uid}') + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + subs.remove(ctx) + log.error(f'{ctx.chan.uid} dropped connection') From fa7fadebac5aeb442dda3a648f8072103f91c566 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 10 Apr 2021 12:45:29 -0400 Subject: [PATCH 34/37] Report sym on unexpected open --- piker/data/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index c2b06d83..f40f3c52 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -186,7 +186,7 @@ async def allocate_persistent_feed( ) # do history validation? - assert opened, "Persistent shm for sym was already open?!" + assert opened, f'Persistent shm for {symbol} was already open?!' # if not opened: # raise RuntimeError("Persistent shm for sym was already open?!") From 598aec579fc52a8c65ebbfc03f3ebe887f1315d8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 10 Apr 2021 14:11:02 -0400 Subject: [PATCH 35/37] Avoid token checking type mismatches --- piker/data/_sharedmem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 4a52cfb3..b4e6de86 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -371,7 +371,7 @@ def attach_shm_array( key = token.shm_name if key in _known_tokens: - assert _known_tokens[key] == token, "WTF" + assert _Token.from_msg(_known_tokens[key]) == token, "WTF" # attach to array buffer and view as per dtype shm = SharedMemory(name=key) From 9a15cd48f0f5e1eec016679b9f75ddfa9e9576f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 10 Apr 2021 14:11:39 -0400 Subject: [PATCH 36/37] Just symbol info in window title --- piker/ui/_chart.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index d62fef5b..f9aefc34 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -138,7 +138,7 @@ class ChartSpace(QtGui.QWidget): """ # XXX: let's see if this causes mem problems self.window.setWindowTitle( - f'piker chart {symbol.key}@{symbol.brokers} ' + f'{symbol.key}@{symbol.brokers} ' f'tick:{symbol.tick_size}' ) From 8b745f440eda87dee3295935c74ce5f416e5a541 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 10 Apr 2021 14:22:49 -0400 Subject: [PATCH 37/37] Add disti mode instructions --- README.rst | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/README.rst b/README.rst index f13e1be9..d0163951 100644 --- a/README.rst +++ b/README.rst @@ -103,6 +103,21 @@ bet you weren't expecting this from the foss bby:: piker -b kraken chart XBTUSD +run in distributed mode +*********************** +start the service daemon:: + + pikerd -l info + + +connect yourt chart:: + + piker -b kraken chart XMRXBT + + +enjoy persistent real-time data feeds tied to daemon lifetime. + + if anyone asks you what this project is about ********************************************* you don't talk about it.