From 97c2a2da3e140c70c44fae5997bcec943effd8b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 07:25:01 -0500 Subject: [PATCH 01/26] Convert `iter_ohlc_periods()` to a `@tractor.context` --- piker/data/_sampling.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index ecad241d..e52fd93a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -133,18 +133,20 @@ async def increment_ohlc_buffer( # a given sample period. subs = sampler.subscribers.get(delay_s, ()) - for ctx in subs: + for stream in subs: try: - await ctx.send_yield({'index': shm._last.value}) + await stream.send({'index': shm._last.value}) except ( trio.BrokenResourceError, trio.ClosedResourceError ): - log.error(f'{ctx.chan.uid} dropped connection') - subs.remove(ctx) + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + subs.remove(stream) -@tractor.stream +@tractor.context async def iter_ohlc_periods( ctx: tractor.Context, delay_s: int, @@ -158,18 +160,20 @@ async def iter_ohlc_periods( ''' # add our subscription subs = sampler.subscribers.setdefault(delay_s, []) - subs.append(ctx) + await ctx.started() + async with ctx.open_stream() as stream: + subs.append(stream) - try: - # stream and block until cancelled - await trio.sleep_forever() - finally: try: - subs.remove(ctx) - except ValueError: - log.error( - f'iOHLC step stream was already dropped for {ctx.chan.uid}?' - ) + # stream and block until cancelled + await trio.sleep_forever() + finally: + try: + subs.remove(stream) + except ValueError: + log.error( + f'iOHLC step stream was already dropped {ctx.chan.uid}?' + ) async def sample_and_broadcast( From cc026dfb1d4b4aad9e44ee2de14ffcc88f6b851e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Mar 2022 16:49:37 -0500 Subject: [PATCH 02/26] Open feeds using `Portal.open_context()` --- piker/data/feed.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index e2e91d7b..611d79ce 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -519,19 +519,20 @@ async def open_sample_step_stream( # created for all practical purposes async with maybe_open_context( acm_func=partial( - portal.open_stream_from, + portal.open_context, iter_ohlc_periods, ), kwargs={'delay_s': delay_s}, - ) as (cache_hit, istream): - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream + ) as (cache_hit, (ctx, first)): + async with ctx.open_stream() as istream: + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream @dataclass From 94e2103bf55692606714ccde6cb9f70d8d6988d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Mar 2022 10:45:51 -0400 Subject: [PATCH 03/26] Be mega-tolerant to feed consumer disconnects --- piker/data/_sampling.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index e52fd93a..928b3694 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -403,7 +403,16 @@ async def uniform_rate_send( # rate timing exactly lul try: await stream.send({sym: first_quote}) - except trio.ClosedResourceError: + except ( + # NOTE: any of these can be raised by ``tractor``'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! + trio.ClosedResourceError, + trio.BrokenResourceError, + ConnectionResetError, + ): # if the feed consumer goes down then drop # out of this rate limiter log.warning(f'{stream} closed') From 434c340cb8212caf7b93d404810b0ba140bf7768 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 19:47:07 -0500 Subject: [PATCH 04/26] Move factor helper to a classmethod --- piker/data/_source.py | 64 ++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/piker/data/_source.py b/piker/data/_source.py index dfa48453..ac6f6d82 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -17,6 +17,7 @@ """ numpy data source coversion helpers. """ +from __future__ import annotations from typing import Any import decimal @@ -98,16 +99,43 @@ class Symbol(BaseModel): Yah, i guess dats what it izz. """ key: str - type_key: str # {'stock', 'forex', 'future', ... etc.} - tick_size: float - lot_tick_size: float # "volume" precision as min step value - tick_size_digits: int - lot_size_digits: int + tick_size: float = 0.01 + lot_tick_size: float = 0.0 # "volume" precision as min step value + tick_size_digits: int = 2 + lot_size_digits: int = 0 broker_info: dict[str, dict[str, Any]] = {} # specifies a "class" of financial instrument # ex. stock, futer, option, bond etc. + # @validate_arguments + @classmethod + def from_broker_info( + cls, + broker: str, + symbol: str, + info: dict[str, Any], + + # XXX: like wtf.. + # ) -> 'Symbol': + ) -> None: + + tick_size = info.get('price_tick_size', 0.01) + lot_tick_size = info.get('lot_tick_size', 0.0) + + return Symbol( + key=symbol, + tick_size=tick_size, + lot_tick_size=lot_tick_size, + tick_size_digits=float_digits(tick_size), + lot_size_digits=float_digits(lot_tick_size), + broker_info={broker: info}, + ) + + @property + def type_key(self) -> str: + return list(self.broker_info.values())[0]['asset_type'] + @property def brokers(self) -> list[str]: return list(self.broker_info.keys()) @@ -138,32 +166,6 @@ class Symbol(BaseModel): ] -@validate_arguments -def mk_symbol( - - key: str, - type_key: str, - tick_size: float = 0.01, - lot_tick_size: float = 0, - broker_info: dict[str, Any] = {}, - -) -> Symbol: - ''' - Create and return an instrument description for the - "symbol" named as ``key``. - - ''' - return Symbol( - key=key, - type_key=type_key, - tick_size=tick_size, - lot_tick_size=lot_tick_size, - tick_size_digits=float_digits(tick_size), - lot_size_digits=float_digits(lot_tick_size), - broker_info=broker_info, - ) - - def from_df( df: pd.DataFrame, From b16167b8f35367d86e7aed55a51f80c801786860 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 10:59:57 -0400 Subject: [PATCH 05/26] Add prelim fqsn support into our `Symbol` type --- piker/data/_source.py | 69 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 6 deletions(-) diff --git a/piker/data/_source.py b/piker/data/_source.py index ac6f6d82..da2f3d00 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -92,6 +92,28 @@ def ohlc_zeros(length: int) -> np.ndarray: return np.zeros(length, dtype=base_ohlc_dtype) +def uncons_fqsn(fqsn: str) -> tuple[str, str, str]: + ''' + Unpack a fully-qualified-symbol-name to ``tuple``. + + ''' + # TODO: probably reverse the order of all this XD + tokens = fqsn.split('.') + if len(tokens) > 3: + symbol, venue, suffix, broker = tokens + else: + symbol, venue, broker = tokens + suffix = '' + + # head, _, broker = fqsn.rpartition('.') + # symbol, _, suffix = head.rpartition('.') + return ( + broker, + '.'.join([symbol, venue]), + suffix, + ) + + class Symbol(BaseModel): """I guess this is some kinda container thing for dealing with all the different meta-data formats from brokers? @@ -103,6 +125,7 @@ class Symbol(BaseModel): lot_tick_size: float = 0.0 # "volume" precision as min step value tick_size_digits: int = 2 lot_size_digits: int = 0 + suffix: str = '' broker_info: dict[str, dict[str, Any]] = {} # specifies a "class" of financial instrument @@ -115,6 +138,7 @@ class Symbol(BaseModel): broker: str, symbol: str, info: dict[str, Any], + suffix: str = '', # XXX: like wtf.. # ) -> 'Symbol': @@ -129,9 +153,27 @@ class Symbol(BaseModel): lot_tick_size=lot_tick_size, tick_size_digits=float_digits(tick_size), lot_size_digits=float_digits(lot_tick_size), + suffix=suffix, broker_info={broker: info}, ) + @classmethod + def from_fqsn( + cls, + fqsn: str, + info: dict[str, Any], + + # XXX: like wtf.. + # ) -> 'Symbol': + ) -> None: + broker, key, suffix = uncons_fqsn(fqsn) + return cls.from_broker_info( + broker, + key, + info=info, + suffix=suffix, + ) + @property def type_key(self) -> str: return list(self.broker_info.values())[0]['asset_type'] @@ -141,9 +183,10 @@ class Symbol(BaseModel): return list(self.broker_info.keys()) def nearest_tick(self, value: float) -> float: - """Return the nearest tick value based on mininum increment. + ''' + Return the nearest tick value based on mininum increment. - """ + ''' mult = 1 / self.tick_size return round(value * mult) / mult @@ -159,11 +202,25 @@ class Symbol(BaseModel): self.key, ) + def front_fqsn(self) -> str: + broker, key = self.front_feed() + if self.suffix: + tokens = (key, self.suffix, broker) + else: + tokens = (key, broker) + + fqsn = '.'.join(tokens) + return fqsn + def iterfqsns(self) -> list[str]: - return [ - mk_fqsn(self.key, broker) - for broker in self.broker_info.keys() - ] + keys = [] + for broker in self.broker_info.keys(): + fqsn = mk_fqsn(self.key, broker) + if self.suffix: + fqsn += f'.{self.suffix}' + keys.append(fqsn) + + return keys def from_df( From e9d64ffee8620b56a246140e5c0e0a3e844a987d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Mar 2022 09:31:12 -0500 Subject: [PATCH 06/26] Use fqsn in `.manage_history()` Allocate and `.started()` return the `ShmArray` from here as well in prep for tsdb integration. --- piker/data/feed.py | 51 ++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 611d79ce..c9bb5e36 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -191,10 +191,8 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, - shm: ShmArray, bus: _FeedsBus, symbol: str, - we_opened_shm: bool, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -208,21 +206,30 @@ async def manage_history( buffer. ''' - task_status.started() + fqsn = mk_fqsn(mod.name, symbol) - opened = we_opened_shm - # TODO: history validation - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + shm, opened = maybe_open_shm_array( + key=fqsn, + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) if opened: - # ask broker backend for new history + log.info('No existing `marketstored` found..') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # yield back after client connect with filled shm + task_status.started(shm) # indicate to caller that feed can be delivered to # remote requesting client since we've loaded history @@ -243,13 +250,12 @@ async def manage_history( # start shm incrementing for OHLC sampling at the current # detected sampling period if one dne. if sampler.incrementers.get(delay_s) is None: - cs = await bus.start_task( + await bus.start_task( increment_ohlc_buffer, delay_s, ) await trio.sleep_forever() - cs.cancel() async def allocate_persistent_feed( @@ -281,18 +287,6 @@ async def allocate_persistent_feed( fqsn = mk_fqsn(brokername, symbol) - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - shm, opened = maybe_open_shm_array( - key=fqsn, - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - ) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -311,13 +305,11 @@ async def allocate_persistent_feed( # bus.nursery.start_soon( # await bus.start_task( - await bus.nursery.start( + shm = await bus.nursery.start( manage_history, mod, - shm, bus, symbol, - opened, some_data_ready, feed_is_live, ) @@ -454,6 +446,11 @@ async def open_feed_bus( async with ( ctx.open_stream() as stream, ): + # re-send to trigger display loop cycle (necessary especially + # when the mkt is closed and no real-time messages are + # expected). + await stream.send(first_quotes) + if tick_throttle: # open a bg task which receives quotes over a mem chan From 8462ea8a2899af1dd61f70f32f88a184289830a5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 14:47:28 -0400 Subject: [PATCH 07/26] Make the data feed layer "fqsn" aware In order to support instruments with lifetimes (aka derivatives) we need generally need special symbol annotations which detail such meta data (such as `MNQ.GLOBEX.20220717` for daq futes). Further there is really no reason for the public api for this feed layer to care about getting a special "brokername" field since generally the data is coming directly from UIs (eg. search selection) so we might as well accept a fqsn (fully qualified symbol name) which includes the broker name; for now a suffix like `'.ib'`. We may change this schema (soon) but this at least gets us to a point where we expect the full name including broker/provider. An additional detail: for certain "generic" symbol names (like for futes) we will pull a so called "front contract" and map this to a specific fqsn underneath, so there is a double (cached) entry for that entry such that other consumers can use it the same way if desired. Some other machinery changes: - expect the `stream_quotes()` endpoint to deliver it's `.started()` msg almost immediately since we now need it deliver any fqsn asap (yes this means the ep should no longer wait on a "live" first quote and instead deliver what quote data it can right away. - expect the quotes ohlc sampler task to add in the broker name before broadcast to remote (actor) consumers since the backend isn't (yet) expected to do that add in itself. - obviously we start using all the new fqsn related `Symbol` apis --- piker/data/feed.py | 181 ++++++++++++++++++++++++++------------------- 1 file changed, 106 insertions(+), 75 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index c9bb5e36..93989a58 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -50,9 +50,8 @@ from ._sharedmem import ( from .ingest import get_ingestormod from ._source import ( base_iohlc_dtype, - mk_symbol, Symbol, - mk_fqsn, + uncons_fqsn, ) from ..ui import _search from ._sampling import ( @@ -192,7 +191,7 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, bus: _FeedsBus, - symbol: str, + fqsn: str, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -206,8 +205,6 @@ async def manage_history( buffer. ''' - fqsn = mk_fqsn(mod.name, symbol) - # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. shm, opened = maybe_open_shm_array( @@ -226,7 +223,7 @@ async def manage_history( # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) # yield back after client connect with filled shm task_status.started(shm) @@ -285,8 +282,6 @@ async def allocate_persistent_feed( except ImportError: mod = get_ingestormod(brokername) - fqsn = mk_fqsn(brokername, symbol) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -295,28 +290,9 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() - # run 2 tasks: - # - a history loader / maintainer - # - a real-time streamer which consumers and sends new data to any - # consumers as well as writes to storage backends (as configured). - - # XXX: neither of these will raise but will cause an inf hang due to: - # https://github.com/python-trio/trio/issues/2258 - # bus.nursery.start_soon( - # await bus.start_task( - - shm = await bus.nursery.start( - manage_history, - mod, - bus, - symbol, - some_data_ready, - feed_is_live, - ) - # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. - init_msg, first_quotes = await bus.nursery.start( + init_msg, first_quote = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, @@ -325,11 +301,38 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) + # the broker-specific fully qualified symbol name + bfqsn = init_msg[symbol]['fqsn'] + + # HISTORY, run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + shm = await bus.nursery.start( + manage_history, + mod, + bus, + bfqsn, + some_data_ready, + feed_is_live, + ) # we hand an IPC-msg compatible shm token to the caller so it # can read directly from the memory which will be written by # this task. - init_msg[symbol]['shm_token'] = shm.token + msg = init_msg[symbol] + msg['shm_token'] = shm.token + + # true fqsn + fqsn = '.'.join((bfqsn, brokername)) + + # add a fqsn entry that includes the ``.`` suffix + init_msg[fqsn] = msg # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -342,8 +345,22 @@ async def allocate_persistent_feed( log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() - bus.feeds[symbol.lower()] = (init_msg, first_quotes) - task_status.started((init_msg, first_quotes)) + # append ``.`` suffix to each quote symbol + bsym = symbol + f'.{brokername}' + generic_first_quotes = { + bsym: first_quote, + fqsn: first_quote, + } + + bus.feeds[symbol] = bus.feeds[fqsn] = ( + init_msg, + generic_first_quotes, + ) + # for ambiguous names we simply apply the retreived + # feed to that name (for now). + + # task_status.started((init_msg, generic_first_quotes)) + task_status.started() # backend will indicate when real-time quotes have begun. await feed_is_live.wait() @@ -358,10 +375,11 @@ async def allocate_persistent_feed( bus, shm, quote_stream, + brokername, sum_tick_vlm ) finally: - log.warning(f'{symbol}@{brokername} feed task terminated') + log.warning(f'{fqsn} feed task terminated') @tractor.context @@ -394,25 +412,16 @@ async def open_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - bus._subscribers.setdefault(symbol, []) - fqsn = mk_fqsn(brokername, symbol) - - entry = bus.feeds.get(symbol) # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery + entry = bus.feeds.get(symbol) if entry is None: - if not start_stream: - raise RuntimeError( - f'No stream feed exists for {fqsn}?\n' - f'You may need a `brokerd` started first.' - ) - - # allocate a new actor-local stream bus which will persist for - # this `brokerd`. + # allocate a new actor-local stream bus which + # will persist for this `brokerd`. async with bus.task_lock: - init_msg, first_quotes = await bus.nursery.start( + await bus.nursery.start( partial( allocate_persistent_feed, @@ -434,9 +443,30 @@ async def open_feed_bus( # subscriber init_msg, first_quotes = bus.feeds[symbol] + msg = init_msg[symbol] + bfqsn = msg['fqsn'] + + # true fqsn + fqsn = '.'.join([bfqsn, brokername]) + assert fqsn in first_quotes + assert bus.feeds[fqsn] + + # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) + bsym = symbol + f'.{brokername}' + assert bsym in first_quotes + + # we use the broker-specific fqsn (bfqsn) for + # the sampler subscription since the backend isn't (yet) + # expected to append it's own name to the fqsn, so we filter + # on keys which *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bfqsn, []) + # send this even to subscribers to existing feed? # deliver initial info message a first quote asap - await ctx.started((init_msg, first_quotes)) + await ctx.started(( + init_msg, + first_quotes, + )) if not start_stream: log.warning(f'Not opening real-time stream for {fqsn}') @@ -449,14 +479,12 @@ async def open_feed_bus( # re-send to trigger display loop cycle (necessary especially # when the mkt is closed and no real-time messages are # expected). - await stream.send(first_quotes) + await stream.send({fqsn: first_quotes}) + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. if tick_throttle: - - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. - send, recv = trio.open_memory_channel(2**10) cs = await bus.start_task( uniform_rate_send, @@ -469,12 +497,15 @@ async def open_feed_bus( else: sub = (stream, tick_throttle) - subs = bus._subscribers[symbol] + subs = bus._subscribers[bfqsn] subs.append(sub) try: uid = ctx.chan.uid + # ctrl protocol for start/stop of quote streams based on UI + # state (eg. don't need a stream when a symbol isn't being + # displayed). async for msg in stream: if msg == 'pause': @@ -499,7 +530,7 @@ async def open_feed_bus( # n.cancel_scope.cancel() cs.cancel() try: - bus._subscribers[symbol].remove(sub) + bus._subscribers[bfqsn].remove(sub) except ValueError: log.warning(f'{sub} for {symbol} was already removed?') @@ -625,10 +656,10 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: list[str], - loglevel: Optional[str] = None, + fqsns: list[str], + + loglevel: Optional[str] = None, backpressure: bool = True, start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz @@ -638,7 +669,10 @@ async def open_feed( Open a "data feed" which provides streamed real-time quotes. ''' - sym = symbols[0].lower() + fqsn = fqsns[0].lower() + + brokername, key, suffix = uncons_fqsn(fqsn) + bfqsn = fqsn.replace('.' + brokername, '') try: mod = get_brokermod(brokername) @@ -659,7 +693,7 @@ async def open_feed( portal.open_context( open_feed_bus, brokername=brokername, - symbol=sym, + symbol=bfqsn, loglevel=loglevel, start_stream=start_stream, tick_throttle=tick_throttle, @@ -676,9 +710,10 @@ async def open_feed( ): # we can only read from shm shm = attach_shm_array( - token=init_msg[sym]['shm_token'], + token=init_msg[bfqsn]['shm_token'], readonly=True, ) + assert fqsn in first_quotes feed = Feed( name=brokername, @@ -691,17 +726,15 @@ async def open_feed( ) for sym, data in init_msg.items(): - si = data['symbol_info'] - - symbol = mk_symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), + fqsn = data['fqsn'] + f'.{brokername}' + symbol = Symbol.from_fqsn( + fqsn, + info=si, ) - symbol.broker_info[brokername] = si + # symbol.broker_info[brokername] = si + feed.symbols[fqsn] = symbol feed.symbols[sym] = symbol # cast shm dtype to list... can't member why we need this @@ -725,8 +758,7 @@ async def open_feed( @asynccontextmanager async def maybe_open_feed( - brokername: str, - symbols: list[str], + fqsns: list[str], loglevel: Optional[str] = None, **kwargs, @@ -738,13 +770,12 @@ async def maybe_open_feed( in a tractor broadcast receiver. ''' - sym = symbols[0].lower() + fqsn = fqsns[0] async with maybe_open_context( acm_func=open_feed, kwargs={ - 'brokername': brokername, - 'symbols': [sym], + 'fqsns': fqsns, 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), @@ -752,11 +783,11 @@ async def maybe_open_feed( 'backpressure': kwargs.get('backpressure', True), 'start_stream': kwargs.get('start_stream', True), }, - key=sym, + key=fqsn, ) as (cache_hit, feed): if cache_hit: - log.info(f'Using cached feed for {brokername}.{sym}') + log.info(f'Using cached feed for {fqsn}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: From 7f36e85815e770554a5f1dca05344c045e04a7ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 15:05:32 -0400 Subject: [PATCH 08/26] Append broker name to symbols before quotes broadcast in sampler task --- piker/data/_sampling.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 928b3694..16c6b017 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -181,6 +181,7 @@ async def sample_and_broadcast( bus: '_FeedsBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, + brokername: str, sum_tick_vlm: bool = True, ) -> None: @@ -190,8 +191,7 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! - for sym, quote in quotes.items(): - + for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that # will require at least some way to prevent task switching @@ -255,7 +255,13 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[sym.lower()] + subs = bus._subscribers[broker_symbol.lower()] + + # NOTE: by default the broker backend doesn't append + # it's own "name" into the fqsn schema (but maybe it + # should?) so we have to manually generate the correct + # key here. + bsym = f'{broker_symbol}.{brokername}' lags = 0 for (stream, tick_throttle) in subs: @@ -266,7 +272,9 @@ async def sample_and_broadcast( # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. try: - stream.send_nowait((sym, quote)) + stream.send_nowait( + (bsym, quote) + ) except trio.WouldBlock: ctx = getattr(stream, '_ctx', None) if ctx: @@ -280,7 +288,9 @@ async def sample_and_broadcast( f'feed @ {tick_throttle} Hz' ) else: - await stream.send({sym: quote}) + await stream.send( + {bsym: quote} + ) if cs.cancelled_caught: lags += 1 From 76f398bd9fbf90ebfa35248b2be2ef7a93961818 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Mar 2022 13:47:25 -0400 Subject: [PATCH 09/26] Support no venue or suffix symbols (normally crypto$) --- piker/data/_source.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/piker/data/_source.py b/piker/data/_source.py index da2f3d00..f3796725 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -97,9 +97,21 @@ def uncons_fqsn(fqsn: str) -> tuple[str, str, str]: Unpack a fully-qualified-symbol-name to ``tuple``. ''' + venue = '' + suffix = '' + # TODO: probably reverse the order of all this XD tokens = fqsn.split('.') - if len(tokens) > 3: + if len(tokens) < 3: + # probably crypto + symbol, broker = tokens + return ( + broker, + symbol, + '', + ) + + elif len(tokens) > 3: symbol, venue, suffix, broker = tokens else: symbol, venue, broker = tokens From 7bd5b42f9e98e82719b13d79c118a88c473d4d77 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Mar 2022 14:26:28 -0400 Subject: [PATCH 10/26] Ensure we lower case the fqsn received from all backends before delivery --- piker/data/feed.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 93989a58..408e7ea0 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -301,8 +301,11 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) - # the broker-specific fully qualified symbol name - bfqsn = init_msg[symbol]['fqsn'] + # the broker-specific fully qualified symbol name, + # but ensure it is lower-cased for external use. + bfqsn = init_msg[symbol]['fqsn'].lower() + init_msg[symbol]['fqsn'] = bfqsn + # HISTORY, run 2 tasks: # - a history loader / maintainer @@ -330,7 +333,6 @@ async def allocate_persistent_feed( # true fqsn fqsn = '.'.join((bfqsn, brokername)) - # add a fqsn entry that includes the ``.`` suffix init_msg[fqsn] = msg @@ -444,7 +446,7 @@ async def open_feed_bus( init_msg, first_quotes = bus.feeds[symbol] msg = init_msg[symbol] - bfqsn = msg['fqsn'] + bfqsn = msg['fqsn'].lower() # true fqsn fqsn = '.'.join([bfqsn, brokername]) @@ -763,7 +765,10 @@ async def maybe_open_feed( **kwargs, -) -> (Feed, ReceiveChannel[dict[str, Any]]): +) -> ( + Feed, + ReceiveChannel[dict[str, Any]], +): ''' Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped @@ -784,6 +789,7 @@ async def maybe_open_feed( 'start_stream': kwargs.get('start_stream', True), }, key=fqsn, + ) as (cache_hit, feed): if cache_hit: From a6e32e75300f36a4fa88dfa3f46c8329944ebff9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Mar 2022 13:25:48 -0400 Subject: [PATCH 11/26] Add `Symbol.tokens()` for grabbing separate strs --- piker/data/_source.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/data/_source.py b/piker/data/_source.py index f3796725..a9efc8cd 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -214,13 +214,15 @@ class Symbol(BaseModel): self.key, ) - def front_fqsn(self) -> str: + def tokens(self) -> tuple[str]: broker, key = self.front_feed() if self.suffix: - tokens = (key, self.suffix, broker) + return (key, self.suffix, broker) else: - tokens = (key, broker) + return (key, broker) + def front_fqsn(self) -> str: + tokens = self.tokens() fqsn = '.'.join(tokens) return fqsn From 81cd696ec818cbfa80319d90fd5e666128cfef26 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Mar 2022 13:29:07 -0400 Subject: [PATCH 12/26] Drop sampler consumers that overrun 6x --- piker/data/_sampling.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 16c6b017..d31bf7b1 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -19,6 +19,8 @@ Sampling and broadcast machinery for (soft) real-time delivery of financial data flows. """ +from __future__ import annotations +from collections import Counter import time import tractor @@ -188,6 +190,8 @@ async def sample_and_broadcast( log.info("Started shared mem bar writer") + overruns = Counter() + # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! @@ -262,8 +266,8 @@ async def sample_and_broadcast( # should?) so we have to manually generate the correct # key here. bsym = f'{broker_symbol}.{brokername}' + lags: int = 0 - lags = 0 for (stream, tick_throttle) in subs: try: @@ -283,10 +287,18 @@ async def sample_and_broadcast( f'{ctx.channel.uid} !!!' ) else: + key = id(stream) + overruns[key] += 1 log.warning( f'Feed overrun {bus.brokername} -> ' f'feed @ {tick_throttle} Hz' ) + if overruns[key] > 6: + log.warning( + f'Dropping consumer {stream}' + ) + await stream.aclose() + raise trio.BrokenResourceError else: await stream.send( {bsym: quote} @@ -309,7 +321,7 @@ async def sample_and_broadcast( '`brokerd`-quotes-feed connection' ) if tick_throttle: - assert stream.closed() + assert stream._closed # XXX: do we need to deregister here # if it's done in the fee bus code? From 8df614465cc6024824b353ba70ceea4e4918e947 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Mar 2022 12:37:58 -0400 Subject: [PATCH 13/26] Fix missing f-str prefix --- piker/data/_sharedmem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 5f7fdcd0..103fa6d5 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -258,7 +258,7 @@ class ShmArray: if index < 0: raise ValueError( f'Array size of {self._len} was overrun during prepend.\n' - 'You have passed {abs(index)} too many datums.' + f'You have passed {abs(index)} too many datums.' ) end = index + length From d0205e726b645703bdf8f4edf862abbacd5de834 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 15:07:48 -0400 Subject: [PATCH 14/26] Pass in fqsn from chart UI components --- piker/ui/_chart.py | 2 +- piker/ui/_display.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 2a3689a3..7938e0d8 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -239,7 +239,7 @@ class GodWidget(QWidget): symbol = linkedsplits.symbol if symbol is not None: self.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' + f'{symbol.front_fqsn()} ' f'tick:{symbol.tick_size}' ) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c6d0f5aa..ce892e9d 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -211,7 +211,6 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): async for quotes in stream: - quote_period = time.time() - last_quote quote_rate = round( 1/quote_period, 1) if quote_period > 0 else float('inf') @@ -480,24 +479,25 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) + fqsn = '.'.join((sym, provider)) async with open_feed( - provider, - [sym], - loglevel=loglevel, + [fqsn], + loglevel=loglevel, - # limit to at least display's FPS - # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches + tick_throttle=_quote_throttle_rate, ) as feed: ohlcv: ShmArray = feed.shm bars = ohlcv.array symbol = feed.symbols[sym] + fqsn = symbol.front_fqsn() # load in symbol's ohlc data godwidget.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' + f'{fqsn} ' f'tick:{symbol.tick_size} ' f'step:1s ' ) @@ -582,8 +582,7 @@ async def display_symbol_data( open_order_mode( feed, chart, - symbol, - provider, + fqsn, order_mode_started ) ): From d62a636bcc50fd798a6c51da1ac542baa3d4b48e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Mar 2022 14:27:04 -0400 Subject: [PATCH 15/26] Pass concatted pre-fqsn directly to feed api --- piker/ui/_display.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index ce892e9d..9957baa4 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -479,10 +479,8 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) - fqsn = '.'.join((sym, provider)) - async with open_feed( - [fqsn], + ['.'.join((sym, provider))], loglevel=loglevel, # limit to at least display's FPS From c7f3e59105d12f14b4301bb0e57e455b4e5c1e30 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 17:31:09 -0400 Subject: [PATCH 16/26] Expect fqsn in ems and order mode Use fqsn as input to the client-side EMS apis but strip broker-name stuff before generating and sending `Brokerd*` msgs to each backend for live order requests (since it's weird for a backend to expect it's own name, though maybe that could be a sanity check?). Summary of fqsn use vs. broker native keys: - client side pps, order requests and general UX for order management use an fqsn for tracking - brokerd side order dialogs use the broker-specific symbol which is usually nearly the same key minus the broker name - internal dark book and quote feed lookups use the fqsn where possible --- piker/clearing/_client.py | 29 ++++++++-------- piker/clearing/_ems.py | 70 ++++++++++++++++++++++----------------- piker/ui/order_mode.py | 22 ++++++------ 3 files changed, 67 insertions(+), 54 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 84983808..74a56917 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -18,7 +18,7 @@ Orders and execution client API. """ -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from typing import Dict from pprint import pformat from dataclasses import dataclass, field @@ -27,7 +27,6 @@ import trio import tractor from tractor.trionics import broadcast_receiver -from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd @@ -156,16 +155,19 @@ async def relay_order_cmds_from_sync_code( await to_ems_stream.send(cmd) -@asynccontextmanager +@acm async def open_ems( - broker: str, - symbol: Symbol, + fqsn: str, -) -> (OrderBook, tractor.MsgStream, dict): - """Spawn an EMS daemon and begin sending orders and receiving +) -> ( + OrderBook, + tractor.MsgStream, + dict, +): + ''' + Spawn an EMS daemon and begin sending orders and receiving alerts. - This EMS tries to reduce most broker's terrible order entry apis to a very simple protocol built on a few easy to grok and/or "rantsy" premises: @@ -194,21 +196,22 @@ async def open_ems( - 'dark_executed', 'broker_executed' - 'broker_filled' - """ + ''' # wait for service to connect back to us signalling # ready for order commands book = get_orders() + from ..data._source import uncons_fqsn + broker, symbol, suffix = uncons_fqsn(fqsn) + async with maybe_open_emsd(broker) as portal: async with ( - # connect to emsd portal.open_context( _emsd_main, - broker=broker, - symbol=symbol.key, + fqsn=fqsn, ) as (ctx, (positions, accounts)), @@ -218,7 +221,7 @@ async def open_ems( async with trio.open_nursery() as n: n.start_soon( relay_order_cmds_from_sync_code, - symbol.key, + fqsn, trades_stream ) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 630405ea..1ebab7ce 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -20,7 +20,7 @@ In da suit parlances: "Execution management systems" """ from contextlib import asynccontextmanager from dataclasses import dataclass, field -from math import isnan +# from math import isnan from pprint import pformat import time from typing import AsyncIterator, Callable @@ -113,8 +113,8 @@ class _DarkBook: # tracks most recent values per symbol each from data feed lasts: dict[ - tuple[str, str], - float + str, + float, ] = field(default_factory=dict) # mapping of piker ems order ids to current brokerd order flow message @@ -135,7 +135,7 @@ async def clear_dark_triggers( ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, - symbol: str, + fqsn: str, book: _DarkBook, @@ -155,7 +155,6 @@ async def clear_dark_triggers( # start = time.time() for sym, quote in quotes.items(): execs = book.orders.get(sym, {}) - for tick in iterticks( quote, # dark order price filter(s) @@ -171,7 +170,7 @@ async def clear_dark_triggers( ttype = tick['type'] # update to keep new cmds informed - book.lasts[(broker, symbol)] = price + book.lasts[sym] = price for oid, ( pred, @@ -196,6 +195,7 @@ async def clear_dark_triggers( action: str = cmd['action'] symbol: str = cmd['symbol'] + bfqsn: str = symbol.replace(f'.{broker}', '') if action == 'alert': # nothing to do but relay a status @@ -225,7 +225,7 @@ async def clear_dark_triggers( # order-request and instead create a new one. reqid=None, - symbol=sym, + symbol=bfqsn, price=submit_price, size=cmd['size'], ) @@ -247,12 +247,9 @@ async def clear_dark_triggers( oid=oid, # ems order id resp=resp, time_ns=time.time_ns(), - - symbol=symbol, + symbol=fqsn, trigger_price=price, - broker_details={'name': broker}, - cmd=cmd, # original request message ).dict() @@ -270,7 +267,7 @@ async def clear_dark_triggers( else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.orders[symbol] = execs + book.orders[fqsn] = execs # print(f'execs scan took: {time.time() - start}') @@ -382,7 +379,8 @@ async def open_brokerd_trades_dialogue( task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, ) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none + ''' + Open and yield ``brokerd`` trades dialogue context-stream if none already exists. ''' @@ -458,12 +456,13 @@ async def open_brokerd_trades_dialogue( # locally cache and track positions per account. pps = {} for msg in positions: + log.info(f'loading pp: {msg}') account = msg['account'] assert account in accounts pps.setdefault( - msg['symbol'], + f'{msg["symbol"]}.{broker}', {} )[account] = msg @@ -563,7 +562,13 @@ async def translate_and_relay_brokerd_events( # XXX: this will be useful for automatic strats yah? # keep pps per account up to date locally in ``emsd`` mem - relay.positions.setdefault(pos_msg['symbol'], {}).setdefault( + sym, broker = pos_msg['symbol'], pos_msg['broker'] + + relay.positions.setdefault( + # NOTE: translate to a FQSN! + f'{sym}.{broker}', + {} + ).setdefault( pos_msg['account'], {} ).update(pos_msg) @@ -840,11 +845,15 @@ async def process_client_order_cmds( msg = Order(**cmd) - sym = msg.symbol + fqsn = msg.symbol trigger_price = msg.price size = msg.size exec_mode = msg.exec_mode broker = msg.brokers[0] + # remove the broker part before creating a message + # to send to the specific broker since they probably + # aren't expectig their own name, but should they? + sym = fqsn.replace(f'.{broker}', '') if exec_mode == 'live' and action in ('buy', 'sell',): @@ -902,7 +911,7 @@ async def process_client_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - last = dark_book.lasts[(broker, sym)] + last = dark_book.lasts[fqsn] pred = mk_check(trigger_price, last, action) spread_slap: float = 5 @@ -933,7 +942,7 @@ async def process_client_order_cmds( # dark book entry if the order id already exists dark_book.orders.setdefault( - sym, {} + fqsn, {} )[oid] = ( pred, tickfilter, @@ -960,8 +969,8 @@ async def process_client_order_cmds( async def _emsd_main( ctx: tractor.Context, - broker: str, - symbol: str, + fqsn: str, + _exec_mode: str = 'dark', # ('paper', 'dark', 'live') loglevel: str = 'info', @@ -1003,6 +1012,8 @@ async def _emsd_main( global _router assert _router + from ..data._source import uncons_fqsn + broker, symbol, suffix = uncons_fqsn(fqsn) dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -1015,17 +1026,16 @@ async def _emsd_main( # spawn one task per broker feed async with ( maybe_open_feed( - broker, - [symbol], + [fqsn], loglevel=loglevel, - ) as (feed, stream), + ) as (feed, quote_stream), ): # XXX: this should be initial price quote from target provider - first_quote = feed.first_quotes[symbol] + first_quote = feed.first_quotes[fqsn] book = _router.get_dark_book(broker) - last = book.lasts[(broker, symbol)] = first_quote['last'] + book.lasts[fqsn] = first_quote['last'] # XXX: ib is a cucker but we've fixed avoiding receiving any # `Nan`s in the backend during market hours (right?). this was @@ -1054,8 +1064,8 @@ async def _emsd_main( # flatten out collected pps from brokerd for delivery pp_msgs = { - sym: list(pps.values()) - for sym, pps in relay.positions.items() + fqsn: list(pps.values()) + for fqsn, pps in relay.positions.items() } # signal to client that we're started and deliver @@ -1072,9 +1082,9 @@ async def _emsd_main( brokerd_stream, ems_client_order_stream, - stream, + quote_stream, broker, - symbol, + fqsn, # form: ... book ) @@ -1090,7 +1100,7 @@ async def _emsd_main( # relay.brokerd_dialogue, brokerd_stream, - symbol, + fqsn, feed, dark_book, _router, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 9f4dbadb..26f44007 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -268,13 +268,14 @@ class OrderMode: ''' staged = self._staged_order - symbol = staged.symbol + symbol: Symbol = staged.symbol oid = str(uuid.uuid4()) # format order data for ems + fqsn = symbol.front_fqsn() order = staged.copy( update={ - 'symbol': symbol.key, + 'symbol': fqsn, 'oid': oid, } ) @@ -519,8 +520,7 @@ async def open_order_mode( feed: Feed, chart: 'ChartPlotWidget', # noqa - symbol: Symbol, - brokername: str, + fqsn: str, started: trio.Event, ) -> None: @@ -546,8 +546,7 @@ async def open_order_mode( # spawn EMS actor-service async with ( - - open_ems(brokername, symbol) as ( + open_ems(fqsn) as ( book, trades_stream, position_msgs, @@ -556,8 +555,7 @@ async def open_order_mode( trio.open_nursery() as tn, ): - log.info(f'Opening order mode for {brokername}.{symbol.key}') - + log.info(f'Opening order mode for {fqsn}') view = chart.view # annotations editors @@ -566,7 +564,7 @@ async def open_order_mode( # symbol id symbol = chart.linked.symbol - symkey = symbol.key + symkey = symbol.front_fqsn() # map of per-provider account keys to position tracker instances trackers: dict[str, PositionTracker] = {} @@ -610,7 +608,7 @@ async def open_order_mode( log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') startup_pp.update_from_msg(msg) - # allocator + # allocator config alloc = mk_allocator( symbol=symbol, account=account_name, @@ -818,8 +816,10 @@ async def process_trades_and_update_ui( 'position', ): sym = mode.chart.linked.symbol - if msg['symbol'].lower() in sym.key: + symbol = msg['symbol'].lower() + fqsn = sym.front_fqsn() + if symbol in fqsn: tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) # update order pane widgets From 493e45e70aad31fba6cd327a86479c214fdf7f2b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Mar 2022 13:28:06 -0400 Subject: [PATCH 17/26] Strip broker name from symbol on pp msg updates --- piker/ui/order_mode.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 26f44007..6316f116 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -816,10 +816,18 @@ async def process_trades_and_update_ui( 'position', ): sym = mode.chart.linked.symbol - symbol = msg['symbol'].lower() + pp_msg_symbol = msg['symbol'].lower() fqsn = sym.front_fqsn() + broker, key = sym.front_feed() + # print( + # f'pp msg symbol: {pp_msg_symbol}\n', + # f'fqsn: {fqsn}\n', + # f'front key: {key}\n', + # ) - if symbol in fqsn: + if ( + pp_msg_symbol == fqsn.replace(f'.{broker}', '') + ): tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) # update order pane widgets From 998a5acd924494c10679e27d2f938d681ae38a59 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Mar 2022 14:28:11 -0400 Subject: [PATCH 18/26] Crypto$ backend updates - move to 3.9+ type annots - add initial draft `open_history_client()` endpoints - deliver `'fqsn'` keys in quote-stream init msgs --- piker/brokers/binance.py | 42 +++++++++++++++++--------- piker/brokers/kraken.py | 65 +++++++++++++++++++++++----------------- 2 files changed, 66 insertions(+), 41 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index f4732e54..5034aca6 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -18,8 +18,11 @@ Binance backend """ -from contextlib import asynccontextmanager -from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator +from contextlib import asynccontextmanager as acm +from typing import ( + Any, Union, Optional, + AsyncGenerator, Callable, +) import time import trio @@ -88,7 +91,7 @@ class Pair(BaseModel): baseCommissionPrecision: int quoteCommissionPrecision: int - orderTypes: List[str] + orderTypes: list[str] icebergAllowed: bool ocoAllowed: bool @@ -96,8 +99,8 @@ class Pair(BaseModel): isSpotTradingAllowed: bool isMarginTradingAllowed: bool - filters: List[Dict[str, Union[str, int, float]]] - permissions: List[str] + filters: list[dict[str, Union[str, int, float]]] + permissions: list[str] @dataclass @@ -145,7 +148,7 @@ class Client: self, method: str, params: dict, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: resp = await self._sesh.get( path=f'/api/v3/{method}', params=params, @@ -200,7 +203,7 @@ class Client: self, pattern: str, limit: int = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs else: @@ -273,7 +276,7 @@ class Client: return array -@asynccontextmanager +@acm async def get_client() -> Client: client = Client() await client.cache_symbols() @@ -353,7 +356,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: } -def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: +def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: """Create a request subscription packet dict. https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams @@ -368,6 +371,17 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: } +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('binance') as client: + yield client + + async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa @@ -385,12 +399,12 @@ async def backfill_bars( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: # XXX: required to propagate ``tractor`` loglevel to piker logging @@ -427,10 +441,11 @@ async def stream_quotes( symbol: { 'symbol_info': sym_infos[sym], 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection): # setup subs @@ -480,8 +495,7 @@ async def stream_quotes( # TODO: use ``anext()`` when it lands in 3.10! typ, quote = await msg_gen.__anext__() - first_quote = {quote['symbol'].lower(): quote} - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, quote)) # signal to caller feed is ready for consumption feed_is_live.set() diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index f64ef7aa..4f5166db 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -18,9 +18,9 @@ Kraken backend. ''' -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from dataclasses import asdict, field -from typing import Dict, List, Tuple, Any, Optional, AsyncIterator +from typing import Any, Optional, AsyncIterator, Callable import time from trio_typing import TaskStatus @@ -80,7 +80,7 @@ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = True -_symbol_info_translation: Dict[str, str] = { +_symbol_info_translation: dict[str, str] = { 'tick_decimals': 'pair_decimals', } @@ -102,16 +102,16 @@ class Pair(BaseModel): lot_multiplier: float # array of leverage amounts available when buying - leverage_buy: List[int] + leverage_buy: list[int] # array of leverage amounts available when selling - leverage_sell: List[int] + leverage_sell: list[int] # fee schedule array in [volume, percent fee] tuples - fees: List[Tuple[int, float]] + fees: list[tuple[int, float]] # maker fee schedule array in [volume, percent fee] tuples (if on # maker/taker) - fees_maker: List[Tuple[int, float]] + fees_maker: list[tuple[int, float]] fee_volume_currency: str # volume discount currency margin_call: str # margin call level @@ -153,7 +153,7 @@ class OHLC: volume: float # Accumulated volume **within interval** count: int # Number of trades within interval # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + ticks: list[Any] = field(default_factory=list) def get_config() -> dict[str, Any]: @@ -177,7 +177,7 @@ def get_config() -> dict[str, Any]: def get_kraken_signature( urlpath: str, - data: Dict[str, Any], + data: dict[str, Any], secret: str ) -> str: postdata = urllib.parse.urlencode(data) @@ -220,7 +220,7 @@ class Client: self._secret = secret @property - def pairs(self) -> Dict[str, Any]: + def pairs(self) -> dict[str, Any]: if self._pairs is None: raise RuntimeError( "Make sure to run `cache_symbols()` on startup!" @@ -233,7 +233,7 @@ class Client: self, method: str, data: dict, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: resp = await self._sesh.post( path=f'/public/{method}', json=data, @@ -246,7 +246,7 @@ class Client: method: str, data: dict, uri_path: str - ) -> Dict[str, Any]: + ) -> dict[str, Any]: headers = { 'Content-Type': 'application/x-www-form-urlencoded', @@ -266,16 +266,16 @@ class Client: async def endpoint( self, method: str, - data: Dict[str, Any] - ) -> Dict[str, Any]: + data: dict[str, Any] + ) -> dict[str, Any]: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) return await self._private(method, data, uri_path) async def get_trades( self, - data: Dict[str, Any] = {} - ) -> Dict[str, Any]: + data: dict[str, Any] = {} + ) -> dict[str, Any]: data['ofs'] = 0 # Grab all trade history # https://docs.kraken.com/rest/#operation/getTradeHistory @@ -378,7 +378,7 @@ class Client: self, pattern: str, limit: int = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs else: @@ -452,7 +452,7 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') -@asynccontextmanager +@acm async def get_client() -> Client: section = get_config() @@ -521,7 +521,7 @@ def normalize_symbol( return ticker.lower() -def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: +def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: ''' Create a request subscription packet dict. @@ -696,12 +696,12 @@ async def handle_order_requests( async def trades_dialogue( ctx: tractor.Context, loglevel: str = None, -) -> AsyncIterator[Dict[str, Any]]: +) -> AsyncIterator[dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection, token: str): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe @@ -980,7 +980,7 @@ def normalize( return topic, quote -def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: +def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: ''' Create a request subscription packet dict. @@ -996,6 +996,17 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('kraken') as client: + yield client + + async def backfill_bars( sym: str, @@ -1017,7 +1028,7 @@ async def backfill_bars( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, @@ -1025,7 +1036,7 @@ async def stream_quotes( sub_type: str = 'ohlc', # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -1064,10 +1075,11 @@ async def stream_quotes( symbol: { 'symbol_info': sym_infos[sym], 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe @@ -1121,8 +1133,7 @@ async def stream_quotes( topic, quote = normalize(ohlc_last) - first_quote = {topic: quote} - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, quote)) # lol, only "closes" when they're margin squeezing clients ;P feed_is_live.set() From 6ac60fbe228450304187cc75e7270c487aa67a15 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 15:04:15 -0400 Subject: [PATCH 19/26] Expect fqsns through fsp machinery --- piker/fsp/_engine.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f1dd49d7..7e75c283 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,6 +37,7 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray +from ..data._source import Symbol from ._api import ( Fsp, _load_builtins, @@ -76,7 +77,7 @@ async def filter_quotes_by_sym( async def fsp_compute( ctx: tractor.Context, - symbol: str, + symbol: Symbol, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -95,13 +96,14 @@ async def fsp_compute( disabled=True ) + fqsn = symbol.front_fqsn() out_stream = func( # TODO: do we even need this if we do the feed api right? # shouldn't a local stream do this before we get a handle # to the async iterable? it's that or we do some kinda # async itertools style? - filter_quotes_by_sym(symbol, quote_stream), + filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg feed.shm, @@ -235,8 +237,7 @@ async def cascade( ctx: tractor.Context, # data feed key - brokername: str, - symbol: str, + fqsn: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], @@ -289,8 +290,7 @@ async def cascade( # open a data feed stream with requested broker async with data.feed.maybe_open_feed( - brokername, - [symbol], + [fqsn], # TODO throttle tick outputs from *this* daemon since # it'll emit tons of ticks due to the throttle only @@ -299,6 +299,7 @@ async def cascade( # tick_throttle=60, ) as (feed, quote_stream): + symbol = feed.symbols[fqsn] profiler(f'{func}: feed up') From ce7d630676603101acce6daba80b010b584cbfbb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 15:06:14 -0400 Subject: [PATCH 20/26] Pass in fqsn from fsp admin apis --- piker/ui/_fsp.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index ac35067c..d56cc2d5 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -386,6 +386,7 @@ class FspAdmin: portal: tractor.Portal, complete: trio.Event, started: trio.Event, + fqsn: str, dst_shm: ShmArray, conf: dict, target: Fsp, @@ -397,7 +398,6 @@ class FspAdmin: cluster and sleeps until signalled to exit. ''' - brokername, sym = self.linked.symbol.front_feed() ns_path = str(target.ns_path) async with ( portal.open_context( @@ -406,8 +406,7 @@ class FspAdmin: cascade, # data feed key - brokername=brokername, - symbol=sym, + fqsn=fqsn, # mems src_shm_token=self.src_shm.token, @@ -429,7 +428,7 @@ class FspAdmin: ): # register output data self._registry[ - (brokername, sym, ns_path) + (fqsn, ns_path) ] = ( stream, dst_shm, @@ -452,11 +451,11 @@ class FspAdmin: ) -> (ShmArray, trio.Event): - fqsn = self.linked.symbol.front_feed() + fqsn = self.linked.symbol.front_fqsn() # allocate an output shm array key, dst_shm, opened = maybe_mk_fsp_shm( - '.'.join(fqsn), + fqsn, target=target, readonly=True, ) @@ -477,6 +476,7 @@ class FspAdmin: portal, complete, started, + fqsn, dst_shm, conf, target, From c9e6c81459e20cd027271570ab3641d652979015 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Mar 2022 13:48:04 -0400 Subject: [PATCH 21/26] Expect fqsn input to paper clearing engine --- piker/clearing/_ems.py | 3 +-- piker/clearing/_paper_engine.py | 15 ++++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 1ebab7ce..5ea19726 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -417,8 +417,7 @@ async def open_brokerd_trades_dialogue( # actor to simulate the real IPC load it'll have when also # pulling data from feeds open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, + fqsn='.'.join([symbol, broker]), loglevel=loglevel, ) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index f87e2203..2ea639c0 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -32,6 +32,7 @@ from dataclasses import dataclass from .. import data from ..data._normalize import iterticks +from ..data._source import uncons_fqsn from ..log import get_logger from ._messages import ( BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, @@ -446,7 +447,7 @@ async def trades_dialogue( ctx: tractor.Context, broker: str, - symbol: str, + fqsn: str, loglevel: str = None, ) -> None: @@ -455,8 +456,7 @@ async def trades_dialogue( async with ( data.open_feed( - broker, - [symbol], + [fqsn], loglevel=loglevel, ) as feed, @@ -491,15 +491,16 @@ async def trades_dialogue( @asynccontextmanager async def open_paperboi( - broker: str, - symbol: str, + fqsn: str, loglevel: str, ) -> Callable: - '''Spawn a paper engine actor and yield through access to + ''' + Spawn a paper engine actor and yield through access to its context. ''' + broker, symbol, expiry = uncons_fqsn(fqsn) service_name = f'paperboi.{broker}' async with ( @@ -518,7 +519,7 @@ async def open_paperboi( async with portal.open_context( trades_dialogue, broker=broker, - symbol=symbol, + fqsn=fqsn, loglevel=loglevel, ) as (ctx, first): From f604437897112803111fa96747c45c6ddb5e2753 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Apr 2022 13:31:21 -0400 Subject: [PATCH 22/26] Remove symbol key from first quote from ib feed --- piker/brokers/ib.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index ab440ffe..3431dfd6 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1472,6 +1472,7 @@ async def stream_quotes( return init_msgs init_msgs = mk_init_msgs() + con = first_ticker.contract # should be real volume for this contract by default @@ -1496,8 +1497,11 @@ async def stream_quotes( topic = '.'.join((con['symbol'], suffix)).lower() quote['symbol'] = topic + # for compat with upcoming fqsn based derivs search + init_msgs[sym]['fqsn'] = topic + # pass first quote asap - first_quote = {topic: quote} + first_quote = quote # it might be outside regular trading hours so see if we can at # least grab history. From 32e316ebff38dc88da7fd45020ca173f1503dc03 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Apr 2022 13:31:38 -0400 Subject: [PATCH 23/26] Drop nl --- piker/data/feed.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 408e7ea0..9389daa3 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -306,7 +306,6 @@ async def allocate_persistent_feed( bfqsn = init_msg[symbol]['fqsn'].lower() init_msg[symbol]['fqsn'] = bfqsn - # HISTORY, run 2 tasks: # - a history loader / maintainer # - a real-time streamer which consumers and sends new data to any From e92632bd34679ee6d24e4ce5e16946621d3ef83f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Apr 2022 21:51:22 -0400 Subject: [PATCH 24/26] Remove old commented nan checking lines --- piker/clearing/_ems.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5ea19726..07de4a6b 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -20,7 +20,6 @@ In da suit parlances: "Execution management systems" """ from contextlib import asynccontextmanager from dataclasses import dataclass, field -# from math import isnan from pprint import pformat import time from typing import AsyncIterator, Callable @@ -491,8 +490,9 @@ async def open_brokerd_trades_dialogue( finally: # parent context must have been closed # remove from cache so next client will respawn if needed - ## TODO: Maybe add a warning - _router.relays.pop(broker, None) + relay = _router.relays.pop(broker, None) + if not relay: + log.warning(f'Relay for {broker} was already removed!?') @tractor.context @@ -1036,11 +1036,6 @@ async def _emsd_main( book = _router.get_dark_book(broker) book.lasts[fqsn] = first_quote['last'] - # XXX: ib is a cucker but we've fixed avoiding receiving any - # `Nan`s in the backend during market hours (right?). this was - # here previously as a sanity check during market hours. - # assert not isnan(last) - # open a stream with the brokerd backend for order # flow dialogue async with ( From ebe26803557fe6c0e12e900847fa8544684dd782 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Apr 2022 01:01:36 -0400 Subject: [PATCH 25/26] Change `uncons_fqsn()` -> `unpack_fqsn()` --- piker/clearing/_client.py | 4 ++-- piker/clearing/_ems.py | 4 ++-- piker/clearing/_paper_engine.py | 4 ++-- piker/data/_source.py | 4 ++-- piker/data/feed.py | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 74a56917..837c28bc 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -201,8 +201,8 @@ async def open_ems( # ready for order commands book = get_orders() - from ..data._source import uncons_fqsn - broker, symbol, suffix = uncons_fqsn(fqsn) + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) async with maybe_open_emsd(broker) as portal: diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 07de4a6b..c49ff4bf 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1011,8 +1011,8 @@ async def _emsd_main( global _router assert _router - from ..data._source import uncons_fqsn - broker, symbol, suffix = uncons_fqsn(fqsn) + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 2ea639c0..99039049 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -32,7 +32,7 @@ from dataclasses import dataclass from .. import data from ..data._normalize import iterticks -from ..data._source import uncons_fqsn +from ..data._source import unpack_fqsn from ..log import get_logger from ._messages import ( BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, @@ -500,7 +500,7 @@ async def open_paperboi( its context. ''' - broker, symbol, expiry = uncons_fqsn(fqsn) + broker, symbol, expiry = unpack_fqsn(fqsn) service_name = f'paperboi.{broker}' async with ( diff --git a/piker/data/_source.py b/piker/data/_source.py index a9efc8cd..9c760bc6 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -92,7 +92,7 @@ def ohlc_zeros(length: int) -> np.ndarray: return np.zeros(length, dtype=base_ohlc_dtype) -def uncons_fqsn(fqsn: str) -> tuple[str, str, str]: +def unpack_fqsn(fqsn: str) -> tuple[str, str, str]: ''' Unpack a fully-qualified-symbol-name to ``tuple``. @@ -178,7 +178,7 @@ class Symbol(BaseModel): # XXX: like wtf.. # ) -> 'Symbol': ) -> None: - broker, key, suffix = uncons_fqsn(fqsn) + broker, key, suffix = unpack_fqsn(fqsn) return cls.from_broker_info( broker, key, diff --git a/piker/data/feed.py b/piker/data/feed.py index 9389daa3..260cab9b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -51,7 +51,7 @@ from .ingest import get_ingestormod from ._source import ( base_iohlc_dtype, Symbol, - uncons_fqsn, + unpack_fqsn, ) from ..ui import _search from ._sampling import ( @@ -672,7 +672,7 @@ async def open_feed( ''' fqsn = fqsns[0].lower() - brokername, key, suffix = uncons_fqsn(fqsn) + brokername, key, suffix = unpack_fqsn(fqsn) bfqsn = fqsn.replace('.' + brokername, '') try: From 4b0ca40b179f1249527f34453bf00f4c6a83e95b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Apr 2022 08:48:17 -0400 Subject: [PATCH 26/26] Document "fqsn" on `Symbol` method --- piker/data/_source.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/piker/data/_source.py b/piker/data/_source.py index 9c760bc6..3fa6db7b 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -222,6 +222,23 @@ class Symbol(BaseModel): return (key, broker) def front_fqsn(self) -> str: + ''' + fqsn = "fully qualified symbol name" + + Basically the idea here is for all client-ish code (aka programs/actors + that ask the provider agnostic layers in the stack for data) should be + able to tell which backend / venue / derivative each data feed/flow is + from by an explicit string key of the current form: + + ... + + TODO: I have thoughts that we should actually change this to be + more like an "attr lookup" (like how the web should have done + urls, but marketting peeps ruined it etc. etc.): + + ... + + ''' tokens = self.tokens() fqsn = '.'.join(tokens) return fqsn