From 92efb8fd8e8d5da330851f31e08dfce6bd0072af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Jan 2021 22:56:22 -0500 Subject: [PATCH] Expect new init message in feed from brokers --- piker/data/__init__.py | 66 ++++++++++++++++++++++++++++-------------- piker/data/_source.py | 19 +++++++++--- 2 files changed, 60 insertions(+), 25 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index fe50eda6..0c08b532 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -21,7 +21,7 @@ We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. """ -from dataclasses import dataclass +from dataclasses import dataclass, field from contextlib import asynccontextmanager from importlib import import_module from types import ModuleType @@ -42,7 +42,7 @@ from ._sharedmem import ( ShmArray, get_shm_token, ) -from ._source import base_iohlc_dtype +from ._source import base_iohlc_dtype, Symbol from ._buffer import ( increment_ohlc_buffer, subscribe_ohlc_for_increment @@ -149,6 +149,10 @@ class Feed: _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + # cache of symbol info messages received as first message when + # a stream startsc. + symbols: Dict[str, Symbol] = field(default_factory=dict) + async def receive(self) -> dict: return await self.stream.__anext__() @@ -208,23 +212,26 @@ def sym_to_shm_key( @asynccontextmanager async def open_feed( - name: str, + brokername: str, symbols: Sequence[str], loglevel: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Open a "data feed" which provides streamed real-time quotes. """ try: - mod = get_brokermod(name) + mod = get_brokermod(brokername) except ImportError: - mod = get_ingestormod(name) + mod = get_ingestormod(brokername) if loglevel is None: loglevel = tractor.current_actor().loglevel + # TODO: do all! + sym = symbols[0] + # Attempt to allocate (or attach to) shm array for this broker/symbol shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(name, symbols[0]), + key=sym_to_shm_key(brokername, sym), # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -234,34 +241,51 @@ async def open_feed( ) async with maybe_spawn_brokerd( - mod.name, + brokername, loglevel=loglevel, ) as portal: stream = await portal.run( mod.stream_quotes, + + # TODO: actually handy multiple symbols... symbols=symbols, + shm_token=shm.token, # compat with eventual ``tractor.msg.pub`` topics=symbols, ) - # TODO: we can't do this **and** be compate with - # ``tractor.msg.pub``, should we maybe just drop this after - # tests are in? - shm_token, is_writer = await stream.receive() - - if opened: - assert is_writer - log.info("Started shared mem bar writer") - - shm_token['dtype_descr'] = list(shm_token['dtype_descr']) - assert shm_token == shm.token # sanity - - yield Feed( - name=name, + feed = Feed( + name=brokername, stream=stream, shm=shm, mod=mod, _brokerd_portal=portal, ) + + # TODO: we can't do this **and** be compate with + # ``tractor.msg.pub``, should we maybe just drop this after + # tests are in? + init_msg = await stream.receive() + + for sym, data in init_msg.items(): + + si = data['symbol_info'] + symbol = Symbol( + sym, + min_tick=si.get('minTick', 0.01), + ) + symbol.broker_info[brokername] = si + + feed.symbols[sym] = symbol + + shm_token = data['shm_token'] + if opened: + assert data['is_shm_writer'] + log.info("Started shared mem bar writer") + + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity + + yield feed diff --git a/piker/data/_source.py b/piker/data/_source.py index a77839ff..6a71b444 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -17,9 +17,9 @@ """ numpy data source coversion helpers. """ -from typing import List +from typing import Dict, Any, List import decimal -from dataclasses import dataclass +from dataclasses import dataclass, field import numpy as np import pandas as pd @@ -82,9 +82,13 @@ class Symbol: """ key: str = '' - brokers: List[str] = None min_tick: float = 0.01 - contract: str = '' + broker_info: Dict[str, Dict[str, Any]] = field(default_factory=dict) + deriv: str = '' + + @property + def brokers(self) -> List[str]: + return list(self.broker_info.keys()) def digits(self) -> int: """Return the trailing number of digits specified by the @@ -93,6 +97,13 @@ class Symbol: """ return float_digits(self.min_tick) + def nearest_tick(self, value: float) -> float: + """Return the nearest tick value based on mininum increment. + + """ + mult = 1 / self.min_tick + return round(value * mult) / mult + def from_df( df: pd.DataFrame,