diff --git a/README.rst b/README.rst index f13e1be9..d0163951 100644 --- a/README.rst +++ b/README.rst @@ -103,6 +103,21 @@ bet you weren't expecting this from the foss bby:: piker -b kraken chart XBTUSD +run in distributed mode +*********************** +start the service daemon:: + + pikerd -l info + + +connect yourt chart:: + + piker -b kraken chart XMRXBT + + +enjoy persistent real-time data feeds tied to daemon lifetime. + + if anyone asks you what this project is about ********************************************* you don't talk about it. diff --git a/piker/_daemon.py b/piker/_daemon.py index 72e390f2..e9c9b53b 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -18,6 +18,7 @@ Structured, daemon tree service management. """ +from functools import partial from typing import Optional, Union from contextlib import asynccontextmanager @@ -87,6 +88,18 @@ async def open_pikerd( yield _services +@asynccontextmanager +async def maybe_open_runtime( + loglevel: Optional[str] = None, + **kwargs, +) -> None: + if not tractor.current_actor(err_on_no_runtime=False): + async with tractor.open_root_actor(loglevel=loglevel, **kwargs): + yield + else: + yield + + @asynccontextmanager async def maybe_open_pikerd( loglevel: Optional[str] = None, @@ -100,23 +113,23 @@ async def maybe_open_pikerd( if loglevel: get_console_log(loglevel) - try: + # subtle, we must have the runtime up here or portal lookup will fail + async with maybe_open_runtime(loglevel, **kwargs): async with tractor.find_actor(_root_dname) as portal: - assert portal is not None - yield portal - return + # assert portal is not None + if portal is not None: + yield portal + return - except (RuntimeError, AssertionError): # tractor runtime not started yet - - # presume pikerd role - async with open_pikerd( - loglevel, - **kwargs, - ) as _: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - yield None + # presume pikerd role + async with open_pikerd( + loglevel, + **kwargs, + ) as _: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + yield None # brokerd enabled modules @@ -124,7 +137,7 @@ _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.data', - 'piker.data._buffer' + 'piker.data._sampling' ] @@ -134,6 +147,8 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: + from .data import _setup_persistent_brokerd + log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -145,13 +160,28 @@ async def spawn_brokerd( global _services assert _services - await _services.actor_n.start_actor( + portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, **tractor_kwargs ) + # TODO: so i think this is the perfect use case for supporting + # a cross-actor async context manager api instead of this + # shoort-and-forget task spawned in the root nursery, we'd have an + # async exit stack that we'd register the `portal.open_context()` + # call with and then have the ability to unwind the call whenevs. + + # non-blocking setup of brokerd service nursery + _services.service_n.start_soon( + partial( + portal.run, + _setup_persistent_brokerd, + brokername=brokername, + ) + ) + return dname diff --git a/piker/brokers/api.py b/piker/brokers/api.py index 75fc8a14..c1e11a01 100644 --- a/piker/brokers/api.py +++ b/piker/brokers/api.py @@ -22,7 +22,6 @@ from typing import Dict from contextlib import asynccontextmanager, AsyncExitStack import trio -import tractor from . import get_brokermod from ..log import get_logger @@ -30,10 +29,12 @@ from ..log import get_logger log = get_logger(__name__) -_cache: Dict[str, 'Client'] = {} + +_cache: Dict[str, 'Client'] = {} # noqa + @asynccontextmanager -async def get_cached_client( +async def open_cached_client( brokername: str, *args, **kwargs, @@ -74,10 +75,11 @@ async def get_cached_client( client._exit_stack = exit_stack clients[brokername] = client - yield client + yield client finally: - client._consumers -= 1 - if client._consumers <= 0: - # teardown the client - await client._exit_stack.aclose() + if client is not None: + # if no more consumers, teardown the client + client._consumers -= 1 + if client._consumers <= 0: + await client._exit_stack.aclose() diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2679e988..c7d8917e 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -34,10 +34,11 @@ import logging import time import trio +from trio_typing import TaskStatus import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError -from ib_insync.contract import Contract, ContractDetails +from ib_insync.contract import Contract, ContractDetails, Option from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import Position @@ -46,14 +47,9 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client from ..log import get_logger, get_console_log -from ..data import ( - maybe_spawn_brokerd, - iterticks, - attach_shm_array, - subscribe_ohlc_for_increment, - _buffer, -) +from ..data import maybe_spawn_brokerd from ..data._source import from_df +from ..data._sharedmem import ShmArray from ._util import SymbolNotFound @@ -168,6 +164,7 @@ class Client: # contract cache self._contracts: Dict[str, Contract] = {} + self._feeds: Dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -384,42 +381,6 @@ class Client: formatDate=2, # timezone aware UTC datetime ) - async def stream_ticker( - self, - symbol: str, - to_trio, - opts: Tuple[int] = ('375', '233', '236'), - contract: Optional[Contract] = None, - ) -> None: - """Stream a ticker using the std L1 api. - """ - contract = contract or (await self.find_contract(symbol)) - ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - - # define a simple queue push routine that streams quote packets - # to trio over the ``to_trio`` memory channel. - - def push(t): - """Push quotes to trio task. - - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - except trio.BrokenResourceError: - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - self.ib.cancelMktData(contract) - - ticker.updateEvent.connect(push) - - # let the engine run and stream - await self.ib.disconnectedEvent - async def get_quote( self, symbol: str, @@ -613,6 +574,8 @@ async def _aio_get_client( client_id: Optional[int] = None, ) -> Client: """Return an ``ib_insync.IB`` instance wrapped in our client API. + + Client instances are cached for later use. """ # first check cache for existing client @@ -652,8 +615,10 @@ async def _aio_get_client( # create and cache try: client = Client(ib) + _client_cache[(host, port)] = client log.debug(f"Caching client for {(host, port)}") + yield client except BaseException: @@ -691,13 +656,14 @@ async def _trio_run_client_method( # if the method is an *async gen* stream for it meth = getattr(Client, method) - if inspect.isasyncgenfunction(meth): - kwargs['_treat_as_stream'] = True - # if the method is an *async func* but manually - # streams back results, make sure to also stream it args = tuple(inspect.getfullargspec(meth).args) - if 'to_trio' in args: + + if inspect.isasyncgenfunction(meth) or ( + # if the method is an *async func* but manually + # streams back results, make sure to also stream it + 'to_trio' in args + ): kwargs['_treat_as_stream'] = True result = await tractor.to_asyncio.run_task( @@ -780,7 +746,7 @@ def normalize( # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: - if tick: + if tick and not isinstance(tick, dict): td = tick._asdict() td['type'] = tick_types.get(td['tickType'], 'n/a') @@ -811,36 +777,13 @@ def normalize( return data -_local_buffer_writers = {} - - -@asynccontextmanager -async def activate_writer(key: str) -> (bool, trio.Nursery): - """Mark the current actor with module var determining - whether an existing shm writer task is already active. - - This avoids more then one writer resulting in data - clobbering. - """ - global _local_buffer_writers - - try: - assert not _local_buffer_writers.get(key, False) - - _local_buffer_writers[key] = True - - async with trio.open_nursery() as n: - yield n - finally: - _local_buffer_writers.pop(key, None) - - -async def fill_bars( +async def backfill_bars( sym: str, - first_bars: list, - shm: 'ShmArray', # type: ignore # noqa + shm: ShmArray, # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 6, # NOTE: any more and we'll overrun the underlying buffer + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Fill historical bars into shared mem / storage afap. @@ -848,41 +791,58 @@ async def fill_bars( https://github.com/pikers/piker/issues/128 """ - next_dt = first_bars[0].date + first_bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + ) - i = 0 - while i < count: + # write historical data to buffer + shm.push(bars_array) - try: - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - end_dt=next_dt, - ) + with trio.CancelScope() as cs: - shm.push(bars_array, prepend=True) - i += 1 - next_dt = bars[0].date + task_status.started(cs) - except RequestError as err: - # TODO: retreive underlying ``ib_insync`` error? + next_dt = first_bars[0].date - if err.code == 162: + i = 0 + while i < count: - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "dead zone" - # and further requests seem to always cause - # throttling despite the rps being low - break + try: + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + end_dt=next_dt, + ) - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f` in TWS") + if bars_array is None: + raise SymbolNotFound(sym) - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - await tractor.breakpoint() + shm.push(bars_array, prepend=True) + i += 1 + next_dt = bars[0].date + + except RequestError as err: + # TODO: retreive underlying ``ib_insync`` error? + + if err.code == 162: + + if 'HMDS query returned no data' in err.message: + # means we hit some kind of historical "dead zone" + # and further requests seem to always cause + # throttling despite the rps being low + break + + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) + + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + await tractor.breakpoint() asset_type_map = { @@ -904,27 +864,96 @@ asset_type_map = { } -# TODO: figure out how to share quote feeds sanely despite -# the wacky ``ib_insync`` api. -# @tractor.msg.pub -@tractor.stream +_quote_streams: Dict[str, trio.abc.ReceiveStream] = {} + + +async def _setup_quote_stream( + symbol: str, + opts: Tuple[int] = ('375', '233', '236'), + contract: Optional[Contract] = None, +) -> None: + """Stream a ticker using the std L1 api. + """ + global _quote_streams + + async with _aio_get_client() as client: + + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + + def push(t): + """Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + client.ib.cancelMktData(contract) + + # decouple broadcast mem chan + _quote_streams.pop(symbol, None) + + ticker.updateEvent.connect(push) + + return from_aio + + +async def start_aio_quote_stream( + symbol: str, + contract: Optional[Contract] = None, +) -> trio.abc.ReceiveStream: + + global _quote_streams + + from_aio = _quote_streams.get(symbol) + if from_aio: + + # if we already have a cached feed deliver a rx side clone to consumer + return from_aio.clone() + + else: + + from_aio = await tractor.to_asyncio.run_task( + _setup_quote_stream, + symbol=symbol, + contract=contract, + ) + + # cache feed for later consumers + _quote_streams[symbol] = from_aio + + return from_aio + + async def stream_quotes( - ctx: tractor.Context, + + send_chan: trio.abc.SendChannel, symbols: List[str], - shm_token: Tuple[str, str, List[tuple]], + shm: ShmArray, + feed_is_live: trio.Event, loglevel: str = None, - # compat for @tractor.msg.pub - topics: Any = None, - get_topics: Callable = None, -) -> AsyncIterator[Dict[str, Any]]: + # startup sync + task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: """Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked once the brokerd is up. """ - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) # TODO: support multiple subscriptions sym = symbols[0] @@ -934,130 +963,67 @@ async def stream_quotes( symbol=sym, ) - stream = await _trio_run_client_method( - method='stream_ticker', - contract=contract, # small speedup - symbol=sym, - ) + stream = await start_aio_quote_stream(symbol=sym, contract=contract) - shm = None - async with trio.open_nursery() as ln: - # check if a writer already is alive in a streaming task, - # otherwise start one and mark it as now existing + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - key = shm_token['shm_name'] + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - writer_already_exists = _local_buffer_writers.get(key, False) + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - if not writer_already_exists: - _local_buffer_writers[key] = True + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - shm = attach_shm_array( - token=shm_token, + # for "traditional" assets, volume is normally discreet, not a float + syminfo['lot_tick_size'] = 0.0 - # we are the buffer writer - readonly=False, - ) - - # async def retrieve_and_push(): - start = time.time() - - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - - ) - - log.info(f"bars_array request: {time.time() - start}") - - if bars_array is None: - raise SymbolNotFound(sym) - - # write historical data to buffer - shm.push(bars_array) - shm_token = shm.token - - # TODO: generalize this for other brokers - # start bar filler task in bg - ln.start_soon(fill_bars, sym, bars, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) - - # pass back some symbol info like min_tick, trading_hours, etc. - # con = asdict(contract) - # syminfo = contract - syminfo = asdict(details) - syminfo.update(syminfo['contract']) - - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 - - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 - - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'is_shm_writer': not writer_already_exists, - 'shm_token': shm_token, - 'symbol_info': syminfo, - } + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, } - await ctx.send_yield(init_msgs) + } - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - # should be real volume for this contract - calc_price = False - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = 'secType' - calc_price = True - # ticker = first_ticker + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + # should be real volume for this contract + calc_price = False + else: + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = 'secType' + calc_price = True - # pass first quote asap - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + # pass first quote asap + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - first_quote = {topic: quote} + first_quote = {topic: quote} - # yield first quote asap - await ctx.send_yield(first_quote) + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] - # ticker.ticks = [] + log.debug(f"First ticker received {quote}") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + task_status.started((init_msgs, first_quote)) - log.debug(f"First ticker received {quote}") + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + calc_price = False # should be real volume for contract - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - - calc_price = False # should be real volume for contract - - # with trio.move_on_after(10) as cs: # wait for real volume on feed (trading might be closed) - async with aclosing(stream): + async for ticker in stream: # for a real volume contract we rait for the first @@ -1072,108 +1038,45 @@ async def stream_quotes( # (ahem, ib_insync is truly stateful trash) ticker.ticks = [] - # tell incrementer task it can start - _buffer.shm_incrementing(key).set() - # XXX: this works because we don't use # ``aclosing()`` above? break - # enter stream loop - try: - await stream_and_write( - stream=stream, - calc_price=calc_price, - topic=topic, - writer_already_exists=writer_already_exists, - shm=shm, - suffix=suffix, - ctx=ctx, - ) - finally: - if not writer_already_exists: - _local_buffer_writers[key] = False + # tell caller quotes are now coming in live + feed_is_live.set() + async for ticker in stream: -async def stream_and_write( - stream, - calc_price: bool, - topic: str, - writer_already_exists: bool, - suffix: str, - ctx: tractor.Context, - shm: Optional['SharedArray'], # noqa -) -> None: - """Core quote streaming and shm writing loop; optimize for speed! - - """ - # real-time stream - async for ticker in stream: - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - quote['symbol'] = topic - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_already_exists: - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] - - # print(f"{quote['symbol']}: {tick}") - - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - - new_v = tick.get('size', 0) - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - v + new_v, + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price ) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - await ctx.send_yield({topic: quote}) + await send_chan.send({topic: quote}) - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ugh, clear ticks since we've consumed them + ticker.ticks = [] def pack_position(pos: Position) -> Dict[str, Any]: con = pos.contract + + if isinstance(con, Option): + # TODO: option symbol parsing and sane display: + symbol = con.localSymbol.replace(' ', '') + + else: + symbol = con.symbol + return { 'broker': 'ib', 'account': pos.account, - 'symbol': con.symbol, + 'symbol': symbol, 'currency': con.currency, 'size': float(pos.position), 'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 5d8763c3..3620f19a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -16,15 +16,17 @@ """ Kraken backend. + """ from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple import json import time import trio_websocket +from trio_typing import TaskStatus from trio_websocket._impl import ( ConnectionClosed, DisconnectionTimeout, @@ -41,15 +43,11 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel + +from .api import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log -from ..data import ( - _buffer, - # iterticks, - attach_shm_array, - get_shm_token, - subscribe_ohlc_for_increment, -) +from ..data import ShmArray log = get_logger(__name__) @@ -315,6 +313,7 @@ def normalize( quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['last'] = quote['close'] + quote['bar_wap'] = ohlc.vwap # seriously eh? what's with this non-symmetry everywhere # in subscription systems... @@ -426,17 +425,37 @@ async def open_autorecon_ws(url): await stack.aclose() -# @tractor.msg.pub +async def backfill_bars( + sym: str, + shm: ShmArray, # type: ignore # noqa + + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + """Fill historical bars into shared mem / storage afap. + """ + with trio.CancelScope() as cs: + async with open_cached_client('kraken') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + async def stream_quotes( - # get_topics: Callable, - shm_token: Tuple[str, str, List[tuple]], - symbols: List[str] = ['XBTUSD', 'XMRUSD'], - # These are the symbols not expected by the ws api - # they are looked up inside this routine. - sub_type: str = 'ohlc', + + send_chan: trio.abc.SendChannel, + symbols: List[str], + shm: ShmArray, + feed_is_live: trio.Event, loglevel: str = None, - # compat with eventual ``tractor.msg.pub`` - topics: Optional[List[str]] = None, + + # backend specific + sub_type: str = 'ohlc', + + # startup sync + task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. @@ -447,7 +466,8 @@ async def stream_quotes( ws_pairs = {} sym_infos = {} - async with get_client() as client: + + async with open_cached_client('kraken') as client, send_chan as send_chan: # keep client cached for real-time section for sym in symbols: @@ -458,40 +478,16 @@ async def stream_quotes( sym_infos[sym] = syminfo ws_pairs[sym] = si.wsname - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - writer_exists = get_shm_token(shm_token['shm_name']) - symbol = symbols[0] - if not writer_exists: - shm = attach_shm_array( - token=shm_token, - # we are writer - readonly=False, - ) - bars = await client.bars(symbol=symbol) - - shm.push(bars) - shm_token = shm.token - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) - - # yield shm_token, not writer_exists init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written symbol: { - 'is_shm_writer': not writer_exists, - 'shm_token': shm_token, 'symbol_info': sym_infos[sym], - } - # for sym in symbols + 'shm_write_opts': {'sum_tick_vml': False}, + }, } - yield init_msgs async with open_autorecon_ws('wss://ws.kraken.com/') as ws: @@ -521,15 +517,16 @@ async def stream_quotes( # pull a first quote and deliver msg_gen = stream_messages(ws) + # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await msg_gen.__anext__() topic, quote = normalize(ohlc_last) - # packetize as {topic: quote} - yield {topic: quote} + first_quote = {topic: quote} + task_status.started((init_msgs, first_quote)) - # tell incrementer task it can start - _buffer.shm_incrementing(shm_token['shm_name']).set() + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -546,15 +543,18 @@ async def stream_quotes( # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m volume = ohlc.volume - # new interval + # new OHLC sample interval if ohlc.etime > last_interval_start: last_interval_start = ohlc.etime tick_volume = volume + else: # this is the tick volume *within the interval* tick_volume = volume - ohlc_last.volume + ohlc_last = ohlc last = ohlc.close + if tick_volume: ohlc.ticks.append({ 'type': 'trade', @@ -564,43 +564,10 @@ async def stream_quotes( topic, quote = normalize(ohlc) - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - new_v = tick_volume - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - # write shm - shm.array[ - ['open', - 'high', - 'low', - 'close', - 'bar_wap', # in this case vwap of bar - 'volume'] - ][-1] = ( - o, - max(high, last), - min(low, last), - last, - ohlc.vwap, - volume, - ) - ohlc_last = ohlc - elif typ == 'l1': quote = ohlc topic = quote['symbol'] # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` - yield {topic: quote} + await send_chan.send({topic: quote}) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index e54c75a2..528df91e 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1180,6 +1180,11 @@ def normalize( return new +# TODO: currently this backend uses entirely different +# data feed machinery that was written earlier then the +# existing stuff used in other backends. This needs to +# be ported eventually and should *just work* despite +# being a multi-symbol, poll-style feed system. @tractor.stream async def stream_quotes( ctx: tractor.Context, # marks this as a streaming func @@ -1192,7 +1197,7 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel) - async with api.get_cached_client('questrade') as client: + async with api.open_cached_client('questrade') as client: if feed_type == 'stock': formatter = format_stock_quote get_quotes = await stock_quoter(client, symbols) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6accc0b8..598f4f54 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -181,6 +181,7 @@ async def maybe_open_emsd( async with tractor.find_actor('pikerd') as portal: assert portal + name = await portal.run( spawn_emsd, brokername=brokername, @@ -190,7 +191,6 @@ async def maybe_open_emsd( yield portal - @asynccontextmanager async def open_ems( broker: str, @@ -247,4 +247,13 @@ async def open_ems( with trio.fail_after(10): await book._ready_to_receive.wait() - yield book, trades_stream + try: + yield book, trades_stream + + finally: + # TODO: we want to eventually keep this up (by having + # the exec loop keep running in the pikerd tree) but for + # now we have to kill the context to avoid backpressure + # build-up on the shm write loop. + with trio.CancelScope(shield=True): + await trades_stream.aclose() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5ebfee4a..0998aa44 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -40,7 +40,11 @@ log = get_logger(__name__) # TODO: numba all of this -def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: +def mk_check( + trigger_price: float, + known_last: float, + action: str, +) -> Callable[[float, float], bool]: """Create a predicate for given ``exec_price`` based on last known price, ``known_last``. @@ -68,7 +72,7 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: return check_lt else: - return None, None + return None @dataclass @@ -230,6 +234,7 @@ async def execute_triggers( async def exec_loop( ctx: tractor.Context, + feed: 'Feed', # noqa broker: str, symbol: str, _exec_mode: str, @@ -239,67 +244,63 @@ async def exec_loop( to brokers. """ - async with data.open_feed( - broker, - [symbol], - loglevel='info', - ) as feed: - # TODO: get initial price quote from target broker - first_quote = await feed.receive() + # TODO: get initial price quote from target broker + first_quote = await feed.receive() - book = get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + book = get_dark_book(broker) + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - # TODO: wrap this in a more re-usable general api - client_factory = getattr(feed.mod, 'get_client_proxy', None) + # TODO: wrap this in a more re-usable general api + client_factory = getattr(feed.mod, 'get_client_proxy', None) - if client_factory is not None and _exec_mode != 'paper': + if client_factory is not None and _exec_mode != 'paper': - # we have an order API for this broker - client = client_factory(feed._brokerd_portal) + # we have an order API for this broker + client = client_factory(feed._brokerd_portal) - else: - # force paper mode - log.warning(f'Entering paper trading mode for {broker}') + else: + # force paper mode + log.warning(f'Entering paper trading mode for {broker}') - client = PaperBoi( - broker, - *trio.open_memory_channel(100), - _buys={}, - _sells={}, - _reqids={}, - ) + client = PaperBoi( + broker, + *trio.open_memory_channel(100), + _buys={}, + _sells={}, - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - feed._trade_stream = client.trade_stream + _reqids={}, + ) - # init the trades stream - client._to_trade_stream.send_nowait({'local_trades': 'start'}) + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + feed._trade_stream = client.trade_stream - _exec_mode = 'paper' + # init the trades stream + client._to_trade_stream.send_nowait({'local_trades': 'start'}) - # return control to parent task - task_status.started((first_quote, feed, client)) + _exec_mode = 'paper' - # shield this field so the remote brokerd does not get cancelled - stream = feed.stream - with stream.shield(): - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) + # return control to parent task + task_status.started((first_quote, feed, client)) - if _exec_mode == 'paper': - n.start_soon(simulate_fills, stream.clone(), client) + stream = feed.stream + async with trio.open_nursery() as n: + n.start_soon( + execute_triggers, + broker, + symbol, + stream, + ctx, + client, + book + ) + + if _exec_mode == 'paper': + # TODO: make this an actual broadcast channels as in: + # https://github.com/python-trio/trio/issues/987 + n.start_soon(simulate_fills, stream, client) # TODO: lots of cases still to handle @@ -512,7 +513,6 @@ async def process_order_cmds( exec_mode = cmd['exec_mode'] broker = brokers[0] - last = dark_book.lasts[(broker, sym)] if exec_mode == 'live' and action in ('buy', 'sell',): @@ -557,9 +557,10 @@ async def process_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - pred = mk_check(trigger_price, last) + last = dark_book.lasts[(broker, sym)] + pred = mk_check(trigger_price, last, action) - tick_slap: float = 5 + spread_slap: float = 5 min_tick = feed.symbols[sym].tick_size if action == 'buy': @@ -569,12 +570,12 @@ async def process_order_cmds( # TODO: we probably need to scale this based # on some near term historical spread # measure? - abs_diff_away = tick_slap * min_tick + abs_diff_away = spread_slap * min_tick elif action == 'sell': tickfilter = ('bid', 'last', 'trade') percent_away = -0.005 - abs_diff_away = -tick_slap * min_tick + abs_diff_away = -spread_slap * min_tick else: # alert tickfilter = ('trade', 'utrade', 'last') @@ -647,35 +648,41 @@ async def _emsd_main( async with trio.open_nursery() as n: # TODO: eventually support N-brokers - - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, + async with data.open_feed( broker, - symbol, - _mode, - ) + [symbol], + loglevel='info', + ) as feed: - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) - # connect back to the calling actor (the one that is - # acting as an EMS client and will submit orders) to - # receive requests pushed over a tractor stream - # using (for now) an async generator. - order_stream = await portal.run(send_order_cmds) + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # connect back to the calling actor (the one that is + # acting as an EMS client and will submit orders) to + # receive requests pushed over a tractor stream + # using (for now) an async generator. + order_stream = await portal.run(send_order_cmds) + + # start inbound order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + ) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 0559cbfb..f40f3c52 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -20,20 +20,29 @@ Data feed apis and infra. We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. + """ from dataclasses import dataclass, field from contextlib import asynccontextmanager +from functools import partial from importlib import import_module from types import ModuleType from typing import ( - Dict, Any, Sequence, AsyncIterator, Optional + Dict, Any, Sequence, + AsyncIterator, Optional, + List ) +import trio +from trio_typing import TaskStatus import tractor +from pydantic import BaseModel from ..brokers import get_brokermod from ..log import get_logger, get_console_log -from .._daemon import spawn_brokerd, maybe_open_pikerd +from .._daemon import ( + maybe_spawn_brokerd, +) from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -43,9 +52,12 @@ from ._sharedmem import ( get_shm_token, ) from ._source import base_iohlc_dtype, Symbol -from ._buffer import ( +from ._sampling import ( + _shms, + _incrementers, increment_ohlc_buffer, - subscribe_ohlc_for_increment + iter_ohlc_periods, + sample_and_broadcast, ) __all__ = [ @@ -54,7 +66,7 @@ __all__ = [ 'attach_shm_array', 'open_shm_array', 'get_shm_token', - 'subscribe_ohlc_for_increment', + # 'subscribe_ohlc_for_increment', ] @@ -74,57 +86,229 @@ def get_ingestormod(name: str) -> ModuleType: return module -@asynccontextmanager -async def maybe_spawn_brokerd( - brokername: str, - loglevel: Optional[str] = None, +class _FeedsBus(BaseModel): + """Data feeds broadcaster and persistence management. - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = True, -) -> tractor._portal.Portal: - """If no ``brokerd.{brokername}`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. + This is a brokerd side api used to manager persistent real-time + streams that can be allocated and left alive indefinitely. """ - if loglevel: - get_console_log(loglevel) + brokername: str + nursery: trio.Nursery + feeds: Dict[str, trio.CancelScope] = {} + subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() - dname = f'brokerd.{brokername}' - async with tractor.find_actor(dname) as portal: + class Config: + arbitrary_types_allowed = True - # WTF: why doesn't this work? - if portal is not None: - yield portal + async def cancel_all(self) -> None: + for sym, (cs, msg, quote) in self.feeds.items(): + log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') + cs.cancel() + +_bus: _FeedsBus = None + + +def get_feed_bus( + brokername: str, + nursery: Optional[trio.Nursery] = None, +) -> _FeedsBus: + """ + Retreive broker-daemon-local data feeds bus from process global + scope. Serialize task access to lock. + + """ + + global _bus + + if nursery is not None: + assert _bus is None, "Feeds manager is already setup?" + + # this is initial setup by parent actor + _bus = _FeedsBus( + brokername=brokername, + nursery=nursery, + ) + assert not _bus.feeds + + assert _bus.brokername == brokername, "Uhhh wtf" + return _bus + + +async def _setup_persistent_brokerd(brokername: str) -> None: + """Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + """ + try: + async with trio.open_nursery() as service_nursery: + + # assign a nursery to the feeds bus for spawning + # background tasks from clients + bus = get_feed_bus(brokername, service_nursery) + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + finally: + # TODO: this needs to be shielded? + await bus.cancel_all() + + +async def allocate_persistent_feed( + ctx: tractor.Context, + bus: _FeedsBus, + brokername: str, + symbol: str, + loglevel: str, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # allocate shm array for this broker/symbol + # XXX: we should get an error here if one already exists + + shm, opened = maybe_open_shm_array( + key=sym_to_shm_key(brokername, symbol), + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) + + # do history validation? + assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") + + send, quote_stream = trio.open_memory_channel(10) + feed_is_live = trio.Event() + + # establish broker backend quote stream + # ``stream_quotes()`` is a required backend func + init_msg, first_quote = await bus.nursery.start( + partial( + mod.stream_quotes, + send_chan=send, + feed_is_live=feed_is_live, + symbols=[symbol], + shm=shm, + loglevel=loglevel, + ) + ) + + init_msg[symbol]['shm_token'] = shm.token + cs = bus.nursery.cancel_scope + + # TODO: make this into a composed type which also + # contains the backfiller cs for individual super-based + # resspawns when needed. + bus.feeds[symbol] = (cs, init_msg, first_quote) + + if opened: + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + await bus.nursery.start(mod.backfill_bars, symbol, shm) + + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # pass OHLC sample rate in seconds + init_msg[symbol]['sample_rate'] = delay_s + + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) + + await feed_is_live.wait() + + if opened: + _shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling + if _incrementers.get(delay_s) is None: + cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) + + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) + + # start sample loop + await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + + +@tractor.stream +async def attach_feed_bus( + ctx: tractor.Context, + brokername: str, + symbol: str, + loglevel: str, +): + + # try: + if loglevel is None: + loglevel = tractor.current_actor().loglevel + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # ensure we are who we think we are + assert 'brokerd' in tractor.current_actor().name + + bus = get_feed_bus(brokername) + + async with bus.task_lock: + task_cs = bus.feeds.get(symbol) + sub_only: bool = False + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + if task_cs is None: + init_msg, first_quote = await bus.nursery.start( + partial( + allocate_persistent_feed, + ctx=ctx, + bus=bus, + brokername=brokername, + symbol=symbol, + loglevel=loglevel, + ) + ) + bus.subscribers.setdefault(symbol, []).append(ctx) else: - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - loglevel=loglevel - ) as pikerd_portal: + sub_only = True - if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_brokerd(brokername, loglevel=loglevel) + # XXX: ``first_quote`` may be outdated here if this is secondary + # subscriber + cs, init_msg, first_quote = bus.feeds[symbol] - else: - await pikerd_portal.run( - spawn_brokerd, - brokername=brokername, - loglevel=loglevel, - debug_mode=debug_mode, - ) + # send this even to subscribers to existing feed? + await ctx.send_yield(init_msg) + await ctx.send_yield(first_quote) - async with tractor.wait_for_actor(dname) as portal: - yield portal + if sub_only: + bus.subscribers[symbol].append(ctx) + + try: + await trio.sleep_forever() + finally: + bus.subscribers[symbol].remove(ctx) @dataclass class Feed: - """A data feed for client-side interaction with far-process + """A data feed for client-side interaction with far-process# }}} real-time data sources. This is an thin abstraction on top of ``tractor``'s portals for @@ -135,10 +319,11 @@ class Feed: stream: AsyncIterator[Dict[str, Any]] shm: ShmArray mod: ModuleType - # ticks: ShmArray + _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _max_sample_rate: int = 0 # cache of symbol info messages received as first message when # a stream startsc. @@ -147,15 +332,19 @@ class Feed: async def receive(self) -> dict: return await self.stream.__anext__() - async def index_stream(self) -> AsyncIterator[int]: + async def index_stream( + self, + delay_s: Optional[int] = None + + ) -> AsyncIterator[int]: + if not self._index_stream: # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes self._index_stream = await self._brokerd_portal.run( - increment_ohlc_buffer, - shm_token=self.shm.token, - topics=['index'], + iter_ohlc_periods, + delay_s=delay_s or self._max_sample_rate, ) return self._index_stream @@ -214,40 +403,29 @@ async def open_feed( # TODO: do all! sym = symbols[0] - # Attempt to allocate (or attach to) shm array for this broker/symbol - shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, sym), - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=True, - ) - async with maybe_spawn_brokerd( - brokername, loglevel=loglevel, - - # TODO: add a cli flag for this - # debug_mode=False, - ) as portal: stream = await portal.run( - mod.stream_quotes, - - # TODO: actually handy multiple symbols... - symbols=symbols, - - shm_token=shm.token, - - # compat with eventual ``tractor.msg.pub`` - topics=symbols, + attach_feed_bus, + brokername=brokername, + symbol=sym, loglevel=loglevel, ) + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 + init_msg = await stream.receive() + + # we can only read from shm + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + readonly=True, + ) + feed = Feed( name=brokername, stream=stream, @@ -255,15 +433,12 @@ async def open_feed( mod=mod, _brokerd_portal=portal, ) - - # TODO: we can't do this **and** be compate with - # ``tractor.msg.pub``, should we maybe just drop this after - # tests are in? - init_msg = await stream.receive() + ohlc_sample_rates = [] for sym, data in init_msg.items(): si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) symbol = Symbol( key=sym, @@ -275,12 +450,17 @@ async def open_feed( feed.symbols[sym] = symbol + # cast shm dtype to list... can't member why we need this shm_token = data['shm_token'] - if opened: - assert data['is_shm_writer'] - log.info("Started shared mem bar writer") + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity - shm_token['dtype_descr'] = list(shm_token['dtype_descr']) - assert shm_token == shm.token # sanity + feed._max_sample_rate = max(ohlc_sample_rates) - yield feed + try: + yield feed + + finally: + # always cancel the far end producer task + with trio.CancelScope(shield=True): + await stream.aclose() diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py deleted file mode 100644 index 896b503b..00000000 --- a/piker/data/_buffer.py +++ /dev/null @@ -1,115 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -Data buffers for fast shared humpy. -""" -from typing import Tuple, Callable, Dict -# import time - -import tractor -import trio - -from ._sharedmem import ShmArray - - -_shms: Dict[int, ShmArray] = {} -_start_increment: Dict[str, trio.Event] = {} - - -def shm_incrementing(shm_token_name: str) -> trio.Event: - global _start_increment - return _start_increment.setdefault(shm_token_name, trio.Event()) - - -@tractor.msg.pub -async def increment_ohlc_buffer( - shm_token: dict, - get_topics: Callable[..., Tuple[str]], - # delay_s: Optional[float] = None, -): - """Task which inserts new bars into the provide shared memory array - every ``delay_s`` seconds. - - This task fulfills 2 purposes: - - it takes the subscribed set of shm arrays and increments them - on a common time period - - broadcast of this increment "signal" message to other actor - subscribers - - Note that if **no** actor has initiated this task then **none** of - the underlying buffers will actually be incremented. - """ - - # wait for brokerd to signal we should start sampling - await shm_incrementing(shm_token['shm_name']).wait() - - # TODO: right now we'll spin printing bars if the last time stamp is - # before a large period of no market activity. Likely the best way - # to solve this is to make this task aware of the instrument's - # tradable hours? - - # adjust delay to compensate for trio processing time - ad = min(_shms.keys()) - 0.001 - - total_s = 0 # total seconds counted - lowest = min(_shms.keys()) - ad = lowest - 0.001 - - while True: - # TODO: do we want to support dynamically - # adding a "lower" lowest increment period? - await trio.sleep(ad) - total_s += lowest - - # increment all subscribed shm arrays - # TODO: this in ``numba`` - for delay_s, shms in _shms.items(): - if total_s % delay_s != 0: - continue - - # TODO: numa this! - for shm in shms: - # TODO: in theory we could make this faster by copying the - # "last" readable value into the underlying larger buffer's - # next value and then incrementing the counter instead of - # using ``.push()``? - - # append new entry to buffer thus "incrementing" the bar - array = shm.array - last = array[-1:][shm._write_fields].copy() - # (index, t, close) = last[0][['index', 'time', 'close']] - (t, close) = last[0][['time', 'close']] - - # this copies non-std fields (eg. vwap) from the last datum - last[ - ['time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (t + delay_s, 0, close, close, close, close) - - # write to the buffer - shm.push(last) - - # broadcast the buffer index step - yield {'index': shm._last.value} - - -def subscribe_ohlc_for_increment( - shm: ShmArray, - delay: int, -) -> None: - """Add an OHLC ``ShmArray`` to the increment set. - """ - _shms.setdefault(delay, []).append(shm) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py new file mode 100644 index 00000000..40951697 --- /dev/null +++ b/piker/data/_sampling.py @@ -0,0 +1,240 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Data buffers for fast shared humpy. +""" +from typing import Dict, List + +import tractor +import trio +from trio_typing import TaskStatus + +from ._sharedmem import ShmArray +from ..log import get_logger + + +log = get_logger(__name__) + + +# TODO: we could stick these in a composed type to avoid +# angering the "i hate module scoped variables crowd" (yawn). +_shms: Dict[int, List[ShmArray]] = {} +_start_increment: Dict[str, trio.Event] = {} +_incrementers: Dict[int, trio.CancelScope] = {} +_subscribers: Dict[str, tractor.Context] = {} + + +def shm_incrementing(shm_token_name: str) -> trio.Event: + global _start_increment + return _start_increment.setdefault(shm_token_name, trio.Event()) + + +async def increment_ohlc_buffer( + delay_s: int, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +): + """Task which inserts new bars into the provide shared memory array + every ``delay_s`` seconds. + + This task fulfills 2 purposes: + - it takes the subscribed set of shm arrays and increments them + on a common time period + - broadcast of this increment "signal" message to other actor + subscribers + + Note that if **no** actor has initiated this task then **none** of + the underlying buffers will actually be incremented. + """ + + # # wait for brokerd to signal we should start sampling + # await shm_incrementing(shm_token['shm_name']).wait() + + # TODO: right now we'll spin printing bars if the last time stamp is + # before a large period of no market activity. Likely the best way + # to solve this is to make this task aware of the instrument's + # tradable hours? + + global _incrementers + + # adjust delay to compensate for trio processing time + ad = min(_shms.keys()) - 0.001 + + total_s = 0 # total seconds counted + lowest = min(_shms.keys()) + ad = lowest - 0.001 + + with trio.CancelScope() as cs: + + # register this time period step as active + _incrementers[delay_s] = cs + task_status.started(cs) + + while True: + # TODO: do we want to support dynamically + # adding a "lower" lowest increment period? + await trio.sleep(ad) + total_s += lowest + + # increment all subscribed shm arrays + # TODO: this in ``numba`` + for delay_s, shms in _shms.items(): + if total_s % delay_s != 0: + continue + + # TODO: ``numba`` this! + for shm in shms: + # TODO: in theory we could make this faster by copying the + # "last" readable value into the underlying larger buffer's + # next value and then incrementing the counter instead of + # using ``.push()``? + + # append new entry to buffer thus "incrementing" the bar + array = shm.array + last = array[-1:][shm._write_fields].copy() + # (index, t, close) = last[0][['index', 'time', 'close']] + (t, close) = last[0][['time', 'close']] + + # this copies non-std fields (eg. vwap) from the last datum + last[ + ['time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (t + delay_s, 0, close, close, close, close) + + # write to the buffer + shm.push(last) + + # broadcast the buffer index step + # yield {'index': shm._last.value} + for ctx in _subscribers.get(delay_s, ()): + try: + await ctx.send_yield({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') + + +@tractor.stream +async def iter_ohlc_periods( + ctx: tractor.Context, + delay_s: int, +) -> None: + """ + Subscribe to OHLC sampling "step" events: when the time + aggregation period increments, this event stream emits an index + event. + + """ + # add our subscription + global _subscribers + subs = _subscribers.setdefault(delay_s, []) + subs.append(ctx) + + try: + # stream and block until cancelled + await trio.sleep_forever() + finally: + subs.remove(ctx) + + +async def sample_and_broadcast( + bus: '_FeedBus', # noqa + shm: ShmArray, + quote_stream: trio.abc.ReceiveChannel, + sum_tick_vlm: bool = True, +) -> None: + + log.info("Started shared mem bar writer") + + # iterate stream delivered by broker + async for quotes in quote_stream: + + # TODO: ``numba`` this! + for sym, quote in quotes.items(): + + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # start writing the shm buffer with appropriate + # trade data + for tick in quote['ticks']: + + # if tick['type'] in ('utrade',): + # print(tick) + + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): + + last = tick['price'] + + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + + new_v = tick.get('size', 0) + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + if sum_tick_vlm: + volume = v + new_v + else: + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'bar_wap', # can be optionally provided + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + quote.get('bar_wap', 0), + volume, + ) + + # XXX: we need to be very cautious here that no + # context-channel is left lingering which doesn't have + # a far end receiver actor-task. In such a case you can + # end up triggering backpressure which which will + # eventually block this producer end of the feed and + # thus other consumers still attached. + subs = bus.subscribers[sym] + for ctx in subs: + # print(f'sub is {ctx.chan.uid}') + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + subs.remove(ctx) + log.error(f'{ctx.chan.uid} dropped connection') diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 3dc8de89..b4e6de86 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -16,6 +16,7 @@ """ NumPy compatible shared memory buffers for real-time FSP. + """ from dataclasses import dataclass, asdict from sys import byteorder @@ -370,7 +371,7 @@ def attach_shm_array( key = token.shm_name if key in _known_tokens: - assert _known_tokens[key] == token, "WTF" + assert _Token.from_msg(_known_tokens[key]) == token, "WTF" # attach to array buffer and view as per dtype shm = SharedMemory(name=key) @@ -426,7 +427,7 @@ def maybe_open_shm_array( **kwargs, ) -> Tuple[ShmArray, bool]: """Attempt to attach to a shared memory block using a "key" lookup - to registered blocks in the users overall "system" registryt + to registered blocks in the users overall "system" registry (presumes you don't have the block's explicit token). This function is meant to solve the problem of discovering whether diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index c798bf28..0b432e5b 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -28,7 +28,7 @@ from ..log import get_logger from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap -from ..data import attach_shm_array, Feed +from ..data import attach_shm_array log = get_logger(__name__) @@ -62,23 +62,6 @@ async def latency( yield value -async def increment_signals( - feed: Feed, - dst_shm: 'SharedArray', # noqa -) -> None: - """Increment the underlying shared memory buffer on every "increment" - msg received from the underlying data feed. - - """ - async for msg in await feed.index_stream(): - array = dst_shm.array - last = array[-1:].copy() - - # write new slot to the buffer - dst_shm.push(last) - len(dst_shm.array) - - @tractor.stream async def cascade( ctx: tractor.Context, @@ -150,7 +133,6 @@ async def cascade( ) history[fsp_func_name] = history_output - # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff >= 0: @@ -182,8 +164,8 @@ async def cascade( cs = await n.start(fsp_compute) - # Increment the underlying shared memory buffer on every "increment" - # msg received from the underlying data feed. + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. async for msg in await feed.index_stream(): @@ -198,10 +180,11 @@ async def cascade( # TODO: adopt an incremental update engine/approach # where possible here eventually! + # read out last shm row array = dst.array last = array[-1:].copy() - # write new slot to the buffer + # write new row to the shm buffer dst.push(last) last_len = new_len diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index d62fef5b..f9aefc34 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -138,7 +138,7 @@ class ChartSpace(QtGui.QWidget): """ # XXX: let's see if this causes mem problems self.window.setWindowTitle( - f'piker chart {symbol.key}@{symbol.brokers} ' + f'{symbol.key}@{symbol.brokers} ' f'tick:{symbol.tick_size}' ) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 2146cdd5..361dec33 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -22,6 +22,7 @@ All global Qt runtime settings are mostly defined here. """ from typing import Tuple, Callable, Dict, Any import os +import platform import signal import time import traceback @@ -87,16 +88,19 @@ def current_screen() -> QtGui.QScreen: assert screen, "Wow Qt is dumb as shit and has no screen..." return screen -# XXX: pretty sure none of this shit works + +# XXX: pretty sure none of this shit works on linux as per: # https://bugreports.qt.io/browse/QTBUG-53022 +# it seems to work on windows.. no idea wtf is up. +if platform.system() == "Windows": -# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute -# must be set before creating the application -# if hasattr(Qt, 'AA_EnableHighDpiScaling'): -# QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) + # Proper high DPI scaling is available in Qt >= 5.6.0. This attibute + # must be set before creating the application + if hasattr(Qt, 'AA_EnableHighDpiScaling'): + QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) -# if hasattr(Qt, 'AA_UseHighDpiPixmaps'): -# QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) + if hasattr(Qt, 'AA_UseHighDpiPixmaps'): + QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) class MainWindow(QtGui.QMainWindow): @@ -196,7 +200,6 @@ def run_qtractor( async def main(): async with maybe_open_pikerd( - name='qtractor', start_method='trio', **tractor_kwargs, ): diff --git a/piker/ui/_graphics/_cursor.py b/piker/ui/_graphics/_cursor.py index e13ab20b..159c773e 100644 --- a/piker/ui/_graphics/_cursor.py +++ b/piker/ui/_graphics/_cursor.py @@ -90,16 +90,11 @@ class LineDot(pg.CurvePoint): self, ev: QtCore.QEvent, ) -> None: - # print((ev, type(ev))) if not isinstance( ev, QtCore.QDynamicPropertyChangeEvent ) or self.curve() is None: return False - # if ev.propertyName() == 'index': - # print(ev) - # # self.setProperty - (x, y) = self.curve().getData() index = self.property('index') # first = self._plot._ohlc[0]['index'] @@ -172,8 +167,6 @@ class ContentsLabel(pg.LabelItem): if inspect.isfunction(margins[1]): margins = margins[0], ydim(anchor_font_size) - print(f'margins: {margins}') - self.anchor(itemPos=index, parentPos=index, offset=margins) def update_from_ohlc( @@ -403,7 +396,6 @@ class Cursor(pg.GraphicsObject): # update all trackers for item in self._trackers: - # print(f'setting {item} with {(ix, y)}') item.on_tracked_source(ix, iy) if ix != last_ix: diff --git a/piker/ui/_l1.py b/piker/ui/_l1.py index 683f46b9..0ff264bc 100644 --- a/piker/ui/_l1.py +++ b/piker/ui/_l1.py @@ -155,6 +155,8 @@ class LevelLabel(YAxisLabel): self._h_shift * (w + self._x_offset), abs_pos.y() + self._v_shift * h )) + # XXX: definitely need this! + self.update() def set_fmt_str( self, diff --git a/piker/ui/cli.py b/piker/ui/cli.py index a407afe5..93e1a9fd 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -148,6 +148,7 @@ def chart(config, symbol, profile): tractor_kwargs={ 'debug_mode': True, 'loglevel': tractorloglevel, + 'name': 'chart', 'enable_modules': [ 'piker.clearing._client' ], diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 27059894..8b1d1a14 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -321,97 +321,95 @@ async def start_order_mode( async with open_ems( brokername, symbol, - ) as (book, trades_stream): + ) as (book, trades_stream), open_order_mode( + symbol, + chart, + book, + ) as order_mode: - async with open_order_mode( - symbol, - chart, - book, - ) as order_mode: + def get_index(time: float): - def get_index(time: float): + # XXX: not sure why the time is so off here + # looks like we're gonna have to do some fixing.. - # XXX: not sure why the time is so off here - # looks like we're gonna have to do some fixing.. + ohlc = chart._shm.array + indexes = ohlc['time'] >= time - ohlc = chart._shm.array - indexes = ohlc['time'] >= time + if any(indexes): + return ohlc['index'][indexes[-1]] + else: + return ohlc['index'][-1] - if any(indexes): - return ohlc['index'][indexes[-1]] - else: - return ohlc['index'][-1] + # Begin order-response streaming - # Begin order-response streaming + # this is where we receive **back** messages + # about executions **from** the EMS actor + async for msg in trades_stream: - # this is where we receive **back** messages - # about executions **from** the EMS actor - async for msg in trades_stream: + fmsg = pformat(msg) + log.info(f'Received order msg:\n{fmsg}') - fmsg = pformat(msg) - log.info(f'Received order msg:\n{fmsg}') + resp = msg['resp'] - resp = msg['resp'] + if resp in ( + 'position', + ): + # show line label once order is live + order_mode.on_position_update(msg) + continue - if resp in ( - 'position', - ): - # show line label once order is live - order_mode.on_position_update(msg) - continue + # delete the line from view + oid = msg['oid'] - # delete the line from view - oid = msg['oid'] + # response to 'action' request (buy/sell) + if resp in ( + 'dark_submitted', + 'broker_submitted' + ): - # response to 'action' request (buy/sell) - if resp in ( - 'dark_submitted', - 'broker_submitted' - ): + # show line label once order is live + order_mode.on_submit(oid) - # show line label once order is live - order_mode.on_submit(oid) + # resp to 'cancel' request or error condition + # for action request + elif resp in ( + 'broker_cancelled', + 'broker_inactive', + 'dark_cancelled' + ): + # delete level line from view + order_mode.on_cancel(oid) - # resp to 'cancel' request or error condition - # for action request - elif resp in ( - 'broker_cancelled', - 'broker_inactive', - 'dark_cancelled' - ): - # delete level line from view - order_mode.on_cancel(oid) + elif resp in ( + 'dark_executed' + ): + log.info(f'Dark order triggered for {fmsg}') - elif resp in ( - 'dark_executed' - ): - log.info(f'Dark order triggered for {fmsg}') + # for alerts add a triangle and remove the + # level line + if msg['cmd']['action'] == 'alert': - # for alerts add a triangle and remove the - # level line - if msg['cmd']['action'] == 'alert': - - # should only be one "fill" for an alert - order_mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()) - ) - await order_mode.on_exec(oid, msg) - - # response to completed 'action' request for buy/sell - elif resp in ( - 'broker_executed', - ): - await order_mode.on_exec(oid, msg) - - # each clearing tick is responded individually - elif resp in ('broker_filled',): - action = msg['action'] - # TODO: some kinda progress system + # should only be one "fill" for an alert order_mode.on_fill( oid, - price=msg['price'], - arrow_index=get_index(msg['broker_time']), - pointing='up' if action == 'buy' else 'down', + price=msg['trigger_price'], + arrow_index=get_index(time.time()) ) + await order_mode.on_exec(oid, msg) + + # response to completed 'action' request for buy/sell + elif resp in ( + 'broker_executed', + ): + await order_mode.on_exec(oid, msg) + + # each clearing tick is responded individually + elif resp in ('broker_filled',): + action = msg['action'] + # TODO: some kinda progress system + order_mode.on_fill( + oid, + price=msg['price'], + arrow_index=get_index(msg['broker_time']), + pointing='up' if action == 'buy' else 'down', + )