diff --git a/piker/_daemon.py b/piker/_daemon.py index e9c9b53b..a894f1a5 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -43,6 +43,7 @@ _root_modules = [ class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery + debug_mode: bool # tractor sub-actor debug mode flag class Config: arbitrary_types_allowed = True @@ -53,10 +54,16 @@ _services: Optional[Services] = None @asynccontextmanager async def open_pikerd( + start_method: str = 'trio', loglevel: Optional[str] = None, - **kwargs, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + ) -> Optional[tractor._portal.Portal]: - """Start a root piker daemon who's lifetime extends indefinitely + """ + Start a root piker daemon who's lifetime extends indefinitely until cancelled. A root actor nursery is created which can be used to create and keep @@ -71,18 +78,23 @@ async def open_pikerd( # passed through to ``open_root_actor`` name=_root_dname, loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? # enable_modules=[__name__], enable_modules=_root_modules, + ) as _, tractor.open_nursery() as actor_nursery: async with trio.open_nursery() as service_nursery: # assign globally for future daemon/task creation _services = Services( actor_n=actor_nursery, - service_n=service_nursery + service_n=service_nursery, + debug_mode=debug_mode, ) yield _services @@ -93,6 +105,10 @@ async def maybe_open_runtime( loglevel: Optional[str] = None, **kwargs, ) -> None: + """ + Start the ``tractor`` runtime (a root actor) if none exists. + + """ if not tractor.current_actor(err_on_no_runtime=False): async with tractor.open_root_actor(loglevel=loglevel, **kwargs): yield @@ -123,8 +139,7 @@ async def maybe_open_pikerd( # presume pikerd role async with open_pikerd( - loglevel, - **kwargs, + loglevel=loglevel, ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process @@ -137,14 +152,17 @@ _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.data', + 'piker.data.feed', 'piker.data._sampling' ] async def spawn_brokerd( - brokername, + + brokername: str, loglevel: Optional[str] = None, - **tractor_kwargs + **tractor_kwargs, + ) -> tractor._portal.Portal: from .data import _setup_persistent_brokerd @@ -164,6 +182,7 @@ async def spawn_brokerd( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, + debug_mode=_services.debug_mode, **tractor_kwargs ) @@ -187,14 +206,14 @@ async def spawn_brokerd( @asynccontextmanager async def maybe_spawn_brokerd( + brokername: str, loglevel: Optional[str] = None, + **kwargs, - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = True, ) -> tractor._portal.Portal: - """If no ``brokerd.{brokername}`` daemon-actor can be found, + """ + If no ``brokerd.{brokername}`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. """ @@ -213,7 +232,8 @@ async def maybe_spawn_brokerd( # pikerd is not live we now become the root of the # process tree async with maybe_open_pikerd( - loglevel=loglevel + loglevel=loglevel, + **kwargs, ) as pikerd_portal: if pikerd_portal is None: @@ -226,7 +246,6 @@ async def maybe_spawn_brokerd( spawn_brokerd, brokername=brokername, loglevel=loglevel, - debug_mode=debug_mode, ) async with tractor.wait_for_actor(dname) as portal: @@ -234,11 +253,16 @@ async def maybe_spawn_brokerd( async def spawn_emsd( - brokername, + + brokername: str, loglevel: Optional[str] = None, **extra_tractor_kwargs -) -> tractor._portal.Portal: +) -> tractor._portal.Portal: + """ + Start the clearing engine under ``pikerd``. + + """ log.info('Spawning emsd') # TODO: raise exception when _services == None? @@ -251,6 +275,7 @@ async def spawn_emsd( 'piker.clearing._client', ], loglevel=loglevel, + debug_mode=_services.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) return 'emsd' diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 1c340fc3..aa3abd24 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -30,7 +30,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from ..data import maybe_spawn_brokerd +from .._daemon import maybe_spawn_brokerd from ..brokers import core, get_brokermod, data log = get_logger('cli') diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 598f4f54..6138086c 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -137,8 +137,9 @@ def get_orders( # TODO: make this a ``tractor.msg.pub`` -async def send_order_cmds(): - """Order streaming task: deliver orders transmitted from UI +async def send_order_cmds(symbol_key: str): + """ + Order streaming task: deliver orders transmitted from UI to downstream consumers. This is run in the UI actor (usually the one running Qt but could be @@ -160,10 +161,18 @@ async def send_order_cmds(): book._ready_to_receive.set() async for cmd in orders_stream: + print(cmd) + if cmd['symbol'] == symbol_key: - # send msg over IPC / wire - log.info(f'Send order cmd:\n{pformat(cmd)}') - yield cmd + # send msg over IPC / wire + log.info(f'Send order cmd:\n{pformat(cmd)}') + yield cmd + else: + # XXX BRUTAL HACKZORZES !!! + # re-insert for another consumer + # we need broadcast channelz...asap + # https://github.com/goodboy/tractor/issues/204 + book._to_ems.send_nowait(cmd) @asynccontextmanager diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 0998aa44..52fab921 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -175,7 +175,7 @@ async def execute_triggers( tuple(execs.items()) ): - if (ttype not in tf) or (not pred(price)): + if not pred or (ttype not in tf) or (not pred(price)): # majority of iterations will be non-matches continue @@ -675,7 +675,10 @@ async def _emsd_main( # acting as an EMS client and will submit orders) to # receive requests pushed over a tractor stream # using (for now) an async generator. - order_stream = await portal.run(send_order_cmds) + order_stream = await portal.run( + send_order_cmds, + symbol_key=symbol, + ) # start inbound order request processing await process_order_cmds( diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index f8f37095..7dab2a28 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -32,15 +32,25 @@ _context_defaults = dict( @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--pdb', is_flag=True, help='Enable tractor debug mode') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') -def pikerd(loglevel, host, tl): +def pikerd(loglevel, host, tl, pdb): """Spawn the piker broker-daemon. """ from .._daemon import _data_mods, open_pikerd - get_console_log(loglevel) + log = get_console_log(loglevel) + + if pdb: + log.warning(( + "\n" + "!!! You have enabled daemon DEBUG mode !!!\n" + "If a daemon crashes it will likely block" + " the service until resumed from console!\n" + "\n" + )) async def main(): - async with open_pikerd(loglevel): + async with open_pikerd(loglevel=loglevel, debug_mode=pdb): await trio.sleep_forever() trio.run(main) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index f40f3c52..1a287ee2 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -15,452 +15,35 @@ # along with this program. If not, see . """ -Data feed apis and infra. +Data infra. We provide tsdb integrations for retrieving and storing data from your brokers as well as -sharing your feeds with other fellow pikers. +sharing live streams over a network. """ -from dataclasses import dataclass, field -from contextlib import asynccontextmanager -from functools import partial -from importlib import import_module -from types import ModuleType -from typing import ( - Dict, Any, Sequence, - AsyncIterator, Optional, - List -) - -import trio -from trio_typing import TaskStatus -import tractor -from pydantic import BaseModel - -from ..brokers import get_brokermod -from ..log import get_logger, get_console_log -from .._daemon import ( - maybe_spawn_brokerd, -) from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, open_shm_array, - ShmArray, get_shm_token, + ShmArray, ) -from ._source import base_iohlc_dtype, Symbol -from ._sampling import ( - _shms, - _incrementers, - increment_ohlc_buffer, - iter_ohlc_periods, - sample_and_broadcast, +from .feed import ( + open_feed, + _setup_persistent_brokerd, ) + __all__ = [ + 'open_feed', + 'maybe_spawn_brokerd', + 'ShmArray', 'iterticks', 'maybe_open_shm_array', 'attach_shm_array', 'open_shm_array', 'get_shm_token', - # 'subscribe_ohlc_for_increment', + '_setup_persistent_brokerd', ] - - -log = get_logger(__name__) - -__ingestors__ = [ - 'marketstore', -] - - -def get_ingestormod(name: str) -> ModuleType: - """Return the imported ingestor module by name. - """ - module = import_module('.' + name, 'piker.data') - # we only allow monkeying because it's for internal keying - module.name = module.__name__.split('.')[-1] - return module - - -class _FeedsBus(BaseModel): - """Data feeds broadcaster and persistence management. - - This is a brokerd side api used to manager persistent real-time - streams that can be allocated and left alive indefinitely. - - """ - brokername: str - nursery: trio.Nursery - feeds: Dict[str, trio.CancelScope] = {} - subscribers: Dict[str, List[tractor.Context]] = {} - task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() - - class Config: - arbitrary_types_allowed = True - - async def cancel_all(self) -> None: - for sym, (cs, msg, quote) in self.feeds.items(): - log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') - cs.cancel() - - -_bus: _FeedsBus = None - - -def get_feed_bus( - brokername: str, - nursery: Optional[trio.Nursery] = None, -) -> _FeedsBus: - """ - Retreive broker-daemon-local data feeds bus from process global - scope. Serialize task access to lock. - - """ - - global _bus - - if nursery is not None: - assert _bus is None, "Feeds manager is already setup?" - - # this is initial setup by parent actor - _bus = _FeedsBus( - brokername=brokername, - nursery=nursery, - ) - assert not _bus.feeds - - assert _bus.brokername == brokername, "Uhhh wtf" - return _bus - - -async def _setup_persistent_brokerd(brokername: str) -> None: - """Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. - - """ - try: - async with trio.open_nursery() as service_nursery: - - # assign a nursery to the feeds bus for spawning - # background tasks from clients - bus = get_feed_bus(brokername, service_nursery) - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - finally: - # TODO: this needs to be shielded? - await bus.cancel_all() - - -async def allocate_persistent_feed( - ctx: tractor.Context, - bus: _FeedsBus, - brokername: str, - symbol: str, - loglevel: str, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> None: - - try: - mod = get_brokermod(brokername) - except ImportError: - mod = get_ingestormod(brokername) - - # allocate shm array for this broker/symbol - # XXX: we should get an error here if one already exists - - shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, symbol), - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - ) - - # do history validation? - assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") - - send, quote_stream = trio.open_memory_channel(10) - feed_is_live = trio.Event() - - # establish broker backend quote stream - # ``stream_quotes()`` is a required backend func - init_msg, first_quote = await bus.nursery.start( - partial( - mod.stream_quotes, - send_chan=send, - feed_is_live=feed_is_live, - symbols=[symbol], - shm=shm, - loglevel=loglevel, - ) - ) - - init_msg[symbol]['shm_token'] = shm.token - cs = bus.nursery.cancel_scope - - # TODO: make this into a composed type which also - # contains the backfiller cs for individual super-based - # resspawns when needed. - bus.feeds[symbol] = (cs, init_msg, first_quote) - - if opened: - - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start(mod.backfill_bars, symbol, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - - # pass OHLC sample rate in seconds - init_msg[symbol]['sample_rate'] = delay_s - - # yield back control to starting nursery - task_status.started((init_msg, first_quote)) - - await feed_is_live.wait() - - if opened: - _shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling - if _incrementers.get(delay_s) is None: - cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) - - sum_tick_vlm: bool = init_msg.get( - 'shm_write_opts', {} - ).get('sum_tick_vlm', True) - - # start sample loop - await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) - - -@tractor.stream -async def attach_feed_bus( - ctx: tractor.Context, - brokername: str, - symbol: str, - loglevel: str, -): - - # try: - if loglevel is None: - loglevel = tractor.current_actor().loglevel - - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - # ensure we are who we think we are - assert 'brokerd' in tractor.current_actor().name - - bus = get_feed_bus(brokername) - - async with bus.task_lock: - task_cs = bus.feeds.get(symbol) - sub_only: bool = False - - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - if task_cs is None: - init_msg, first_quote = await bus.nursery.start( - partial( - allocate_persistent_feed, - ctx=ctx, - bus=bus, - brokername=brokername, - symbol=symbol, - loglevel=loglevel, - ) - ) - bus.subscribers.setdefault(symbol, []).append(ctx) - else: - sub_only = True - - # XXX: ``first_quote`` may be outdated here if this is secondary - # subscriber - cs, init_msg, first_quote = bus.feeds[symbol] - - # send this even to subscribers to existing feed? - await ctx.send_yield(init_msg) - await ctx.send_yield(first_quote) - - if sub_only: - bus.subscribers[symbol].append(ctx) - - try: - await trio.sleep_forever() - finally: - bus.subscribers[symbol].remove(ctx) - - -@dataclass -class Feed: - """A data feed for client-side interaction with far-process# }}} - real-time data sources. - - This is an thin abstraction on top of ``tractor``'s portals for - interacting with IPC streams and conducting automatic - memory buffer orchestration. - """ - name: str - stream: AsyncIterator[Dict[str, Any]] - shm: ShmArray - mod: ModuleType - - _brokerd_portal: tractor._portal.Portal - _index_stream: Optional[AsyncIterator[int]] = None - _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None - _max_sample_rate: int = 0 - - # cache of symbol info messages received as first message when - # a stream startsc. - symbols: Dict[str, Symbol] = field(default_factory=dict) - - async def receive(self) -> dict: - return await self.stream.__anext__() - - async def index_stream( - self, - delay_s: Optional[int] = None - - ) -> AsyncIterator[int]: - - if not self._index_stream: - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - self._index_stream = await self._brokerd_portal.run( - iter_ohlc_periods, - delay_s=delay_s or self._max_sample_rate, - ) - - return self._index_stream - - async def recv_trades_data(self) -> AsyncIterator[dict]: - - if not getattr(self.mod, 'stream_trades', False): - log.warning( - f"{self.mod.name} doesn't have trade data support yet :(") - - if not self._trade_stream: - raise RuntimeError( - f'Can not stream trade data from {self.mod.name}') - - # NOTE: this can be faked by setting a rx chan - # using the ``_.set_fake_trades_stream()`` method - if self._trade_stream is None: - - self._trade_stream = await self._brokerd_portal.run( - - self.mod.stream_trades, - - # do we need this? -> yes - # the broker side must declare this key - # in messages, though we could probably use - # more then one? - topics=['local_trades'], - ) - - return self._trade_stream - - -def sym_to_shm_key( - broker: str, - symbol: str, -) -> str: - return f'{broker}.{symbol}' - - -@asynccontextmanager -async def open_feed( - brokername: str, - symbols: Sequence[str], - loglevel: Optional[str] = None, -) -> AsyncIterator[Dict[str, Any]]: - """Open a "data feed" which provides streamed real-time quotes. - """ - try: - mod = get_brokermod(brokername) - except ImportError: - mod = get_ingestormod(brokername) - - if loglevel is None: - loglevel = tractor.current_actor().loglevel - - # TODO: do all! - sym = symbols[0] - - async with maybe_spawn_brokerd( - brokername, - loglevel=loglevel, - ) as portal: - - stream = await portal.run( - attach_feed_bus, - brokername=brokername, - symbol=sym, - loglevel=loglevel, - ) - - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - init_msg = await stream.receive() - - # we can only read from shm - shm = attach_shm_array( - token=init_msg[sym]['shm_token'], - readonly=True, - ) - - feed = Feed( - name=brokername, - stream=stream, - shm=shm, - mod=mod, - _brokerd_portal=portal, - ) - ohlc_sample_rates = [] - - for sym, data in init_msg.items(): - - si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) - - symbol = Symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), - ) - symbol.broker_info[brokername] = si - - feed.symbols[sym] = symbol - - # cast shm dtype to list... can't member why we need this - shm_token = data['shm_token'] - shm_token['dtype_descr'] = list(shm_token['dtype_descr']) - assert shm_token == shm.token # sanity - - feed._max_sample_rate = max(ohlc_sample_rates) - - try: - yield feed - - finally: - # always cancel the far end producer task - with trio.CancelScope(shield=True): - await stream.aclose() diff --git a/piker/data/feed.py b/piker/data/feed.py new file mode 100644 index 00000000..d9ce062e --- /dev/null +++ b/piker/data/feed.py @@ -0,0 +1,436 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Data feed apis and infra. + +""" +from dataclasses import dataclass, field +from contextlib import asynccontextmanager +from functools import partial +from types import ModuleType +from typing import ( + Dict, Any, Sequence, + AsyncIterator, Optional, + List +) + +import trio +from trio_typing import TaskStatus +import tractor +from pydantic import BaseModel + +from ..brokers import get_brokermod +from ..log import get_logger, get_console_log +from .._daemon import ( + maybe_spawn_brokerd, +) +from ._sharedmem import ( + maybe_open_shm_array, + attach_shm_array, + ShmArray, +) +from ._source import base_iohlc_dtype, Symbol +from ._sampling import ( + _shms, + _incrementers, + increment_ohlc_buffer, + iter_ohlc_periods, + sample_and_broadcast, +) + + +log = get_logger(__name__) + + +class _FeedsBus(BaseModel): + """Data feeds broadcaster and persistence management. + + This is a brokerd side api used to manager persistent real-time + streams that can be allocated and left alive indefinitely. + + """ + brokername: str + nursery: trio.Nursery + feeds: Dict[str, trio.CancelScope] = {} + subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() + + class Config: + arbitrary_types_allowed = True + + async def cancel_all(self) -> None: + for sym, (cs, msg, quote) in self.feeds.items(): + log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') + cs.cancel() + + +_bus: _FeedsBus = None + + +def get_feed_bus( + brokername: str, + nursery: Optional[trio.Nursery] = None, +) -> _FeedsBus: + """ + Retreive broker-daemon-local data feeds bus from process global + scope. Serialize task access to lock. + + """ + + global _bus + + if nursery is not None: + assert _bus is None, "Feeds manager is already setup?" + + # this is initial setup by parent actor + _bus = _FeedsBus( + brokername=brokername, + nursery=nursery, + ) + assert not _bus.feeds + + assert _bus.brokername == brokername, "Uhhh wtf" + return _bus + + +async def _setup_persistent_brokerd(brokername: str) -> None: + """Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + """ + try: + async with trio.open_nursery() as service_nursery: + + # assign a nursery to the feeds bus for spawning + # background tasks from clients + bus = get_feed_bus(brokername, service_nursery) + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + finally: + # TODO: this needs to be shielded? + await bus.cancel_all() + + +async def allocate_persistent_feed( + ctx: tractor.Context, + bus: _FeedsBus, + brokername: str, + symbol: str, + loglevel: str, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # allocate shm array for this broker/symbol + # XXX: we should get an error here if one already exists + + shm, opened = maybe_open_shm_array( + key=sym_to_shm_key(brokername, symbol), + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) + + # do history validation? + assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") + + send, quote_stream = trio.open_memory_channel(10) + feed_is_live = trio.Event() + + # establish broker backend quote stream + # ``stream_quotes()`` is a required backend func + init_msg, first_quote = await bus.nursery.start( + partial( + mod.stream_quotes, + send_chan=send, + feed_is_live=feed_is_live, + symbols=[symbol], + shm=shm, + loglevel=loglevel, + ) + ) + + init_msg[symbol]['shm_token'] = shm.token + cs = bus.nursery.cancel_scope + + # TODO: make this into a composed type which also + # contains the backfiller cs for individual super-based + # resspawns when needed. + bus.feeds[symbol] = (cs, init_msg, first_quote) + + if opened: + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + await bus.nursery.start(mod.backfill_bars, symbol, shm) + + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # pass OHLC sample rate in seconds + init_msg[symbol]['sample_rate'] = delay_s + + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) + + await feed_is_live.wait() + + if opened: + _shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling + if _incrementers.get(delay_s) is None: + cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) + + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) + + # start sample loop + await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + + +@tractor.stream +async def attach_feed_bus( + ctx: tractor.Context, + brokername: str, + symbol: str, + loglevel: str, +): + + # try: + if loglevel is None: + loglevel = tractor.current_actor().loglevel + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # ensure we are who we think we are + assert 'brokerd' in tractor.current_actor().name + + bus = get_feed_bus(brokername) + + async with bus.task_lock: + task_cs = bus.feeds.get(symbol) + sub_only: bool = False + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + if task_cs is None: + init_msg, first_quote = await bus.nursery.start( + partial( + allocate_persistent_feed, + ctx=ctx, + bus=bus, + brokername=brokername, + symbol=symbol, + loglevel=loglevel, + ) + ) + bus.subscribers.setdefault(symbol, []).append(ctx) + else: + sub_only = True + + # XXX: ``first_quote`` may be outdated here if this is secondary + # subscriber + cs, init_msg, first_quote = bus.feeds[symbol] + + # send this even to subscribers to existing feed? + await ctx.send_yield(init_msg) + await ctx.send_yield(first_quote) + + if sub_only: + bus.subscribers[symbol].append(ctx) + + try: + await trio.sleep_forever() + finally: + bus.subscribers[symbol].remove(ctx) + + +@dataclass +class Feed: + """A data feed for client-side interaction with far-process# }}} + real-time data sources. + + This is an thin abstraction on top of ``tractor``'s portals for + interacting with IPC streams and conducting automatic + memory buffer orchestration. + """ + name: str + stream: AsyncIterator[Dict[str, Any]] + shm: ShmArray + mod: ModuleType + + _brokerd_portal: tractor._portal.Portal + _index_stream: Optional[AsyncIterator[int]] = None + _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _max_sample_rate: int = 0 + + # cache of symbol info messages received as first message when + # a stream startsc. + symbols: Dict[str, Symbol] = field(default_factory=dict) + + async def receive(self) -> dict: + return await self.stream.__anext__() + + async def index_stream( + self, + delay_s: Optional[int] = None + + ) -> AsyncIterator[int]: + + if not self._index_stream: + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + self._index_stream = await self._brokerd_portal.run( + iter_ohlc_periods, + delay_s=delay_s or self._max_sample_rate, + ) + + return self._index_stream + + async def recv_trades_data(self) -> AsyncIterator[dict]: + + if not getattr(self.mod, 'stream_trades', False): + log.warning( + f"{self.mod.name} doesn't have trade data support yet :(") + + if not self._trade_stream: + raise RuntimeError( + f'Can not stream trade data from {self.mod.name}') + + # NOTE: this can be faked by setting a rx chan + # using the ``_.set_fake_trades_stream()`` method + if self._trade_stream is None: + + self._trade_stream = await self._brokerd_portal.run( + + self.mod.stream_trades, + + # do we need this? -> yes + # the broker side must declare this key + # in messages, though we could probably use + # more then one? + topics=['local_trades'], + ) + + return self._trade_stream + + +def sym_to_shm_key( + broker: str, + symbol: str, +) -> str: + return f'{broker}.{symbol}' + + +@asynccontextmanager +async def open_feed( + brokername: str, + symbols: Sequence[str], + loglevel: Optional[str] = None, +) -> AsyncIterator[Dict[str, Any]]: + """Open a "data feed" which provides streamed real-time quotes. + """ + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + if loglevel is None: + loglevel = tractor.current_actor().loglevel + + # TODO: do all! + sym = symbols[0] + + async with maybe_spawn_brokerd( + brokername, + loglevel=loglevel, + ) as portal: + + stream = await portal.run( + attach_feed_bus, + brokername=brokername, + symbol=sym, + loglevel=loglevel, + ) + + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 + init_msg = await stream.receive() + + # we can only read from shm + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + readonly=True, + ) + + feed = Feed( + name=brokername, + stream=stream, + shm=shm, + mod=mod, + _brokerd_portal=portal, + ) + ohlc_sample_rates = [] + + for sym, data in init_msg.items(): + + si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) + + symbol = Symbol( + key=sym, + type_key=si.get('asset_type', 'forex'), + tick_size=si.get('price_tick_size', 0.01), + lot_tick_size=si.get('lot_tick_size', 0.0), + ) + symbol.broker_info[brokername] = si + + feed.symbols[sym] = symbol + + # cast shm dtype to list... can't member why we need this + shm_token = data['shm_token'] + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity + + feed._max_sample_rate = max(ohlc_sample_rates) + + try: + yield feed + + finally: + # always cancel the far end producer task + with trio.CancelScope(shield=True): + await stream.aclose() diff --git a/piker/data/ingest.py b/piker/data/ingest.py new file mode 100644 index 00000000..afb5fc4a --- /dev/null +++ b/piker/data/ingest.py @@ -0,0 +1,41 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Ingestion, for dataz. + +Api layer likely in here... + +""" +from types import ModuleType +from importlib import import_module + +from ..log import get_logger + +log = get_logger(__name__) + +__ingestors__ = [ + 'marketstore', +] + + +def get_ingestormod(name: str) -> ModuleType: + """Return the imported ingestor module by name. + """ + module = import_module('.' + name, 'piker.data') + # we only allow monkeying because it's for internal keying + module.name = module.__name__.split('.')[-1] + return module diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 361dec33..8a48943d 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -200,7 +200,6 @@ def run_qtractor( async def main(): async with maybe_open_pikerd( - start_method='trio', **tractor_kwargs, ): await func(*((widgets,) + args)) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 93e1a9fd..b40a4c31 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -24,7 +24,7 @@ import tractor from ..cli import cli from .. import watchlists as wl -from ..data import maybe_spawn_brokerd +from .._daemon import maybe_spawn_brokerd _config_dir = click.get_app_dir('piker') @@ -125,9 +125,14 @@ def optschain(config, symbol, date, rate, test): is_flag=True, help='Enable pyqtgraph profiling' ) +@click.option( + '--pdb', + is_flag=True, + help='Enable tractor debug mode' +) @click.argument('symbol', required=True) @click.pass_obj -def chart(config, symbol, profile): +def chart(config, symbol, profile, pdb): """Start a real-time chartng UI """ from .. import _profile @@ -146,7 +151,7 @@ def chart(config, symbol, profile): brokername=brokername, piker_loglevel=pikerloglevel, tractor_kwargs={ - 'debug_mode': True, + 'debug_mode': pdb, 'loglevel': tractorloglevel, 'name': 'chart', 'enable_modules': [ diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 8b1d1a14..29efcd0f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -317,6 +317,7 @@ async def start_order_mode( symbol: Symbol, brokername: str, ) -> None: + # spawn EMS actor-service async with open_ems( brokername,