From 146c684f21eccf297420f2cab8d8a266b68f37c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 11:31:38 -0400 Subject: [PATCH 01/28] Cache `brokerd` feeds for reuse in clearing loop --- piker/clearing/_ems.py | 102 ++++++++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 7dbbfbad..cd6985f3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -22,7 +22,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from pprint import pformat import time -from typing import AsyncIterator, Callable, Any +from typing import AsyncIterator, Callable, Optional from bidict import bidict from pydantic import BaseModel @@ -123,7 +123,7 @@ class _DarkBook: # XXX: this is in place to prevent accidental positions that are too # big. Now obviously this won't make sense for crypto like BTC, but # for most traditional brokers it should be fine unless you start -# slinging NQ futes or something. +# slinging NQ futes or something; check ur margin. _DEFAULT_SIZE: float = 1.0 @@ -266,7 +266,7 @@ class TradesRelay: consumers: int = 0 -class _Router(BaseModel): +class Router(BaseModel): '''Order router which manages and tracks per-broker dark book, alerts, clearing and related data feed management. @@ -276,8 +276,6 @@ class _Router(BaseModel): # setup at actor spawn time nursery: trio.Nursery - feeds: dict[tuple[str, str], Any] = {} - # broker to book map books: dict[str, _DarkBook] = {} @@ -343,12 +341,12 @@ class _Router(BaseModel): relay.consumers -= 1 -_router: _Router = None +_router: Router = None async def open_brokerd_trades_dialogue( - router: _Router, + router: Router, feed: Feed, symbol: str, _exec_mode: str, @@ -466,7 +464,7 @@ async def _setup_persistent_emsd( # open a root "service nursery" for the ``emsd`` actor async with trio.open_nursery() as service_nursery: - _router = _Router(nursery=service_nursery) + _router = Router(nursery=service_nursery) # TODO: send back the full set of persistent # orders/execs? @@ -480,7 +478,7 @@ async def translate_and_relay_brokerd_events( broker: str, brokerd_trades_stream: tractor.MsgStream, - router: _Router, + router: Router, ) -> AsyncIterator[dict]: '''Trades update loop - receive updates from ``brokerd`` trades @@ -704,7 +702,7 @@ async def process_client_order_cmds( symbol: str, feed: Feed, # noqa dark_book: _DarkBook, - router: _Router, + router: Router, ) -> None: @@ -904,6 +902,73 @@ async def process_client_order_cmds( ) +class cache: + '''Globally (processs wide) cached, task access to a + kept-alive-while-in-use data feed. + + ''' + lock = trio.Lock() + users: int = 0 + feeds: dict[tuple[str, str], Feed] = {} + no_more_users: Optional[trio.Event] = None + + +@asynccontextmanager +async def maybe_open_clearing_feed( + + broker: str, + symbol: str, + loglevel: str, + +) -> Feed: + try: + log.info(f'Reusing existing feed for {(broker, symbol)}') + yield cache.feeds[(broker, symbol)] + except KeyError: + # lock feed acquisition around task racing / ``trio``'s scheduler protocol + await cache.lock.acquire() + try: + cache.users += 1 + cached_feed = cache.feeds[(broker, symbol)] + cache.lock.release() + try: + yield cached_feed + finally: + cache.users -= 1 + if cache.users == 0: + # signal to original allocator task feed use is complete + cache.no_more_users.set() + return + + except KeyError: + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + cache.no_more_users = trio.Event() + + log.warning(f'Creating new feed for {(broker, symbol)}') + # TODO: eventually support N-brokers + async with ( + data.open_feed( + broker, + [symbol], + loglevel=loglevel, + ) as feed, + ): + cache.feeds[(broker, symbol)] = feed + cache.lock.release() + try: + yield feed + finally: + # don't tear down the feed until there are zero + # users of it left. + if cache.users > 0: + await cache.no_more_users.wait() + + cache.feeds.pop((broker, symbol)) + + @tractor.context async def _emsd_main( @@ -958,32 +1023,25 @@ async def _emsd_main( # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx - cached_feed = _router.feeds.get((broker, symbol)) - if cached_feed: - # TODO: use cached feeds per calling-actor - log.warning(f'Opening duplicate feed for {(broker, symbol)}') + feed: Feed # spawn one task per broker feed async with ( - # TODO: eventually support N-brokers - data.open_feed( + maybe_open_clearing_feed( broker, - [symbol], + symbol, loglevel=loglevel, ) as feed, ): - if not cached_feed: - _router.feeds[(broker, symbol)] = feed # XXX: this should be initial price quote from target provider first_quote = feed.first_quote - # open a stream with the brokerd backend for order - # flow dialogue - book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + # open a stream with the brokerd backend for order + # flow dialogue async with ( # only open if one isn't already up: we try to keep From a0660e553f189193da11b3501dab8dac35c1b34a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 11:43:45 -0400 Subject: [PATCH 02/28] Start top level cacheing apis module --- piker/{brokers/api.py => _cacheables.py} | 6 +++--- piker/brokers/binance.py | 2 +- piker/brokers/core.py | 2 +- piker/brokers/ib.py | 6 ++++-- piker/brokers/kraken.py | 2 +- piker/brokers/questrade.py | 3 ++- 6 files changed, 12 insertions(+), 9 deletions(-) rename piker/{brokers/api.py => _cacheables.py} (95%) diff --git a/piker/brokers/api.py b/piker/_cacheables.py similarity index 95% rename from piker/brokers/api.py rename to piker/_cacheables.py index c1e11a01..c1113431 100644 --- a/piker/brokers/api.py +++ b/piker/_cacheables.py @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -Actor-aware broker agnostic interface. +Cacheing apis and toolz. """ from typing import Dict @@ -23,8 +23,8 @@ from contextlib import asynccontextmanager, AsyncExitStack import trio -from . import get_brokermod -from ..log import get_logger +from .brokers import get_brokermod +from .log import get_logger log = get_logger(__name__) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index e8309cdb..59fb3ea4 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -33,7 +33,7 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from .api import open_cached_client +from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 59621b63..b16f46fe 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -29,7 +29,7 @@ import trio from ..log import get_logger from . import get_brokermod from .._daemon import maybe_spawn_brokerd -from .api import open_cached_client +from .._cacheables import open_cached_client log = get_logger(__name__) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f604fc93..45b93416 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1358,6 +1358,9 @@ async def trades_dialogue( # start order request handler **before** local trades event loop n.start_soon(handle_order_requests, ems_stream) + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... async for event_name, item in ib_trade_events_stream: # XXX: begin normalization of nonsense ib_insync internal @@ -1469,9 +1472,8 @@ async def trades_dialogue( @tractor.context async def open_symbol_search( ctx: tractor.Context, -) -> None: - # async with open_cached_client('ib') as client: +) -> None: # load all symbols locally for fast search await ctx.started({}) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 6d41ab10..cfce2d5a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -34,7 +34,7 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from .api import open_cached_client +from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 3f09587c..7109f15e 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -42,6 +42,7 @@ import wrapt import asks from ..calc import humanize, percent_change +from .._cacheables import open_cached_client from . import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log @@ -1197,7 +1198,7 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel) - async with api.open_cached_client('questrade') as client: + async with open_cached_client('questrade') as client: if feed_type == 'stock': formatter = format_stock_quote get_quotes = await stock_quoter(client, symbols) From 0ce8057823aa677ea003d435f83c5651492ed0e7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 13:20:57 -0400 Subject: [PATCH 03/28] Move feed cacheing to cache mod; put entry retreival into ctx mng --- piker/_cacheables.py | 91 ++++++++++++++++++++++++++++++++++++++++-- piker/clearing/_ems.py | 76 ++--------------------------------- 2 files changed, 91 insertions(+), 76 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index c1113431..48fcdeef 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -18,19 +18,25 @@ Cacheing apis and toolz. """ -from typing import Dict -from contextlib import asynccontextmanager, AsyncExitStack +from typing import Optional +from contextlib import ( + asynccontextmanager, + AsyncExitStack, + contextmanager, +) import trio from .brokers import get_brokermod from .log import get_logger +from . import data +from .data.feed import Feed log = get_logger(__name__) -_cache: Dict[str, 'Client'] = {} # noqa +_cache: dict[str, 'Client'] = {} # noqa @asynccontextmanager @@ -83,3 +89,82 @@ async def open_cached_client( client._consumers -= 1 if client._consumers <= 0: await client._exit_stack.aclose() + + +class cache: + '''Globally (processs wide) cached, task access to a + kept-alive-while-in-use data feed. + + ''' + lock = trio.Lock() + users: int = 0 + feeds: dict[tuple[str, str], Feed] = {} + no_more_users: Optional[trio.Event] = None + + +@asynccontextmanager +async def maybe_open_feed( + + broker: str, + symbol: str, + loglevel: str, + +) -> Feed: + + key = (broker, symbol) + + @contextmanager + def get_and_use() -> Feed: + # key error must bubble here + feed = cache.feeds[key] + log.info(f'Reusing cached feed for {key}') + try: + cache.users += 1 + yield feed + finally: + cache.users -= 1 + if cache.users == 0: + # signal to original allocator task feed use is complete + cache.no_more_users.set() + + try: + with get_and_use() as feed: + yield feed + except KeyError: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + await cache.lock.acquire() + try: + with get_and_use() as feed: + cache.lock.release() + yield feed + return + + except KeyError: + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + cache.no_more_users = trio.Event() + + log.info(f'Allocating new feed for {key}') + # TODO: eventually support N-brokers + async with ( + data.open_feed( + broker, + [symbol], + loglevel=loglevel, + ) as feed, + ): + cache.feeds[key] = feed + cache.lock.release() + try: + yield feed + finally: + # don't tear down the feed until there are zero + # users of it left. + if cache.users > 0: + await cache.no_more_users.wait() + + log.warning('De-allocating feed for {key}') + cache.feeds.pop(key) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index cd6985f3..ad5c6cf4 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -22,7 +22,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from pprint import pformat import time -from typing import AsyncIterator, Callable, Optional +from typing import AsyncIterator, Callable from bidict import bidict from pydantic import BaseModel @@ -30,11 +30,11 @@ import trio from trio_typing import TaskStatus import tractor -from .. import data from ..log import get_logger from ..data._normalize import iterticks from ..data.feed import Feed from .._daemon import maybe_spawn_brokerd +from .._cacheables import maybe_open_feed from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -132,7 +132,6 @@ async def clear_dark_triggers( brokerd_orders_stream: tractor.MsgStream, ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa - broker: str, symbol: str, @@ -902,73 +901,6 @@ async def process_client_order_cmds( ) -class cache: - '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use data feed. - - ''' - lock = trio.Lock() - users: int = 0 - feeds: dict[tuple[str, str], Feed] = {} - no_more_users: Optional[trio.Event] = None - - -@asynccontextmanager -async def maybe_open_clearing_feed( - - broker: str, - symbol: str, - loglevel: str, - -) -> Feed: - try: - log.info(f'Reusing existing feed for {(broker, symbol)}') - yield cache.feeds[(broker, symbol)] - except KeyError: - # lock feed acquisition around task racing / ``trio``'s scheduler protocol - await cache.lock.acquire() - try: - cache.users += 1 - cached_feed = cache.feeds[(broker, symbol)] - cache.lock.release() - try: - yield cached_feed - finally: - cache.users -= 1 - if cache.users == 0: - # signal to original allocator task feed use is complete - cache.no_more_users.set() - return - - except KeyError: - # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. - - cache.no_more_users = trio.Event() - - log.warning(f'Creating new feed for {(broker, symbol)}') - # TODO: eventually support N-brokers - async with ( - data.open_feed( - broker, - [symbol], - loglevel=loglevel, - ) as feed, - ): - cache.feeds[(broker, symbol)] = feed - cache.lock.release() - try: - yield feed - finally: - # don't tear down the feed until there are zero - # users of it left. - if cache.users > 0: - await cache.no_more_users.wait() - - cache.feeds.pop((broker, symbol)) - - @tractor.context async def _emsd_main( @@ -1027,7 +959,7 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_clearing_feed( + maybe_open_feed( broker, symbol, loglevel=loglevel, @@ -1073,11 +1005,9 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - # relay.brokerd_dialogue, brokerd_stream, ems_client_order_stream, feed.stream, - broker, symbol, book From 68ce5b3550eed6b7be38c73578945325039b99dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 14:34:26 -0400 Subject: [PATCH 04/28] Add lifo cache to new module; drop "utils", bleh --- piker/_async_utils.py | 66 -------------------------------------- piker/_cacheables.py | 30 +++++++++++++++++ piker/brokers/questrade.py | 3 +- 3 files changed, 31 insertions(+), 68 deletions(-) delete mode 100644 piker/_async_utils.py diff --git a/piker/_async_utils.py b/piker/_async_utils.py deleted file mode 100644 index fb221215..00000000 --- a/piker/_async_utils.py +++ /dev/null @@ -1,66 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -Async utils no one seems to have built into a core lib (yet). -""" -from typing import AsyncContextManager -from collections import OrderedDict -from contextlib import asynccontextmanager - - -def async_lifo_cache(maxsize=128): - """Async ``cache`` with a LIFO policy. - - Implemented my own since no one else seems to have - a standard. I'll wait for the smarter people to come - up with one, but until then... - """ - cache = OrderedDict() - - def decorator(fn): - - async def wrapper(*args): - key = args - try: - return cache[key] - except KeyError: - if len(cache) >= maxsize: - # discard last added new entry - cache.popitem() - - # do it - cache[key] = await fn(*args) - return cache[key] - - return wrapper - - return decorator - - -@asynccontextmanager -async def _just_none(): - # noop -> skip entering context - yield None - - -@asynccontextmanager -async def maybe_with_if( - predicate: bool, - context: AsyncContextManager, -) -> AsyncContextManager: - async with context if predicate else _just_none() as output: - yield output diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 48fcdeef..1b756f8f 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -18,6 +18,7 @@ Cacheing apis and toolz. """ +from collections import OrderedDict from typing import Optional from contextlib import ( asynccontextmanager, @@ -36,6 +37,35 @@ from .data.feed import Feed log = get_logger(__name__) +def async_lifo_cache(maxsize=128): + """Async ``cache`` with a LIFO policy. + + Implemented my own since no one else seems to have + a standard. I'll wait for the smarter people to come + up with one, but until then... + """ + cache = OrderedDict() + + def decorator(fn): + + async def wrapper(*args): + key = args + try: + return cache[key] + except KeyError: + if len(cache) >= maxsize: + # discard last added new entry + cache.popitem() + + # do it + cache[key] = await fn(*args) + return cache[key] + + return wrapper + + return decorator + + _cache: dict[str, 'Client'] = {} # noqa diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 7109f15e..3cd57411 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -42,11 +42,10 @@ import wrapt import asks from ..calc import humanize, percent_change -from .._cacheables import open_cached_client +from .._cacheables import open_cached_client, async_lifo_cache from . import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log -from .._async_utils import async_lifo_cache from . import get_brokermod from . import api From 66f1d915410a079dbb462f08f85edd2c342a8b8f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 19:27:42 -0400 Subject: [PATCH 05/28] Let's abstractify: -> --- piker/_cacheables.py | 46 +++++++++++++++++++----------------------- piker/clearing/_ems.py | 14 ++++++++----- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 1b756f8f..be1934ca 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -19,7 +19,7 @@ Cacheing apis and toolz. """ from collections import OrderedDict -from typing import Optional +from typing import Optional, Hashable, TypeVar, AsyncContextManager from contextlib import ( asynccontextmanager, AsyncExitStack, @@ -30,10 +30,10 @@ import trio from .brokers import get_brokermod from .log import get_logger -from . import data from .data.feed import Feed +T = TypeVar('T') log = get_logger(__name__) @@ -69,16 +69,17 @@ def async_lifo_cache(maxsize=128): _cache: dict[str, 'Client'] = {} # noqa +# XXX: this mis mostly an alt-implementation of +# maybe_open_ctx() below except it uses an async exit statck. +# ideally wer pick one or the other. @asynccontextmanager async def open_cached_client( brokername: str, - *args, - **kwargs, ) -> 'Client': # noqa - """Get a cached broker client from the current actor's local vars. + '''Get a cached broker client from the current actor's local vars. If one has not been setup do it and cache it. - """ + ''' global _cache clients = _cache.setdefault('clients', {'_lock': trio.Lock()}) @@ -128,25 +129,27 @@ class cache: ''' lock = trio.Lock() users: int = 0 - feeds: dict[tuple[str, str], Feed] = {} + ctxs: dict[tuple[str, str], Feed] = {} no_more_users: Optional[trio.Event] = None @asynccontextmanager -async def maybe_open_feed( +async def maybe_open_ctx( - broker: str, - symbol: str, + key: Hashable, + mngr: AsyncContextManager[T], loglevel: str, -) -> Feed: - - key = (broker, symbol) +) -> T: + '''Maybe open a context manager if there is not already a cached + version for the provided ``key``. Return the cached instance on + a cache hit. + ''' @contextmanager def get_and_use() -> Feed: # key error must bubble here - feed = cache.feeds[key] + feed = cache.ctxs[key] log.info(f'Reusing cached feed for {key}') try: cache.users += 1 @@ -174,22 +177,15 @@ async def maybe_open_feed( # **critical section** that should prevent other tasks from # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - cache.no_more_users = trio.Event() log.info(f'Allocating new feed for {key}') # TODO: eventually support N-brokers - async with ( - data.open_feed( - broker, - [symbol], - loglevel=loglevel, - ) as feed, - ): - cache.feeds[key] = feed + async with mngr as value: + cache.ctxs[key] = value cache.lock.release() try: - yield feed + yield value finally: # don't tear down the feed until there are zero # users of it left. @@ -197,4 +193,4 @@ async def maybe_open_feed( await cache.no_more_users.wait() log.warning('De-allocating feed for {key}') - cache.feeds.pop(key) + cache.ctxs.pop(key) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ad5c6cf4..06536859 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,9 +32,9 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed +from ..data.feed import Feed, open_feed from .._daemon import maybe_spawn_brokerd -from .._cacheables import maybe_open_feed +from .._cacheables import maybe_open_ctx from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -959,9 +959,13 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_feed( - broker, - symbol, + maybe_open_ctx( + key=(broker, symbol), + mngr=open_feed( + broker, + [symbol], + loglevel=loglevel, + ), loglevel=loglevel, ) as feed, ): From 7d5add1c3a94c12766e9dc463767dd4b455a4607 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 08:51:03 -0400 Subject: [PATCH 06/28] Add an njs cache gist link --- piker/_cacheables.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index be1934ca..a1425513 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -18,6 +18,9 @@ Cacheing apis and toolz. """ +# further examples of interest: +# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 + from collections import OrderedDict from typing import Optional, Hashable, TypeVar, AsyncContextManager from contextlib import ( From 224dbbc4e3ec69be17c11ed855e84d1a24be7468 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 09:32:59 -0400 Subject: [PATCH 07/28] Drop feed refs --- piker/_cacheables.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index a1425513..f9d42f0a 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -22,7 +22,13 @@ Cacheing apis and toolz. # https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 from collections import OrderedDict -from typing import Optional, Hashable, TypeVar, AsyncContextManager +from typing import ( + Optional, + Hashable, + TypeVar, + AsyncContextManager, + AsyncIterable, +) from contextlib import ( asynccontextmanager, AsyncExitStack, @@ -33,7 +39,6 @@ import trio from .brokers import get_brokermod from .log import get_logger -from .data.feed import Feed T = TypeVar('T') @@ -132,7 +137,7 @@ class cache: ''' lock = trio.Lock() users: int = 0 - ctxs: dict[tuple[str, str], Feed] = {} + ctxs: dict[tuple[str, str], AsyncIterable] = {} no_more_users: Optional[trio.Event] = None @@ -150,7 +155,7 @@ async def maybe_open_ctx( ''' @contextmanager - def get_and_use() -> Feed: + def get_and_use() -> AsyncIterable[T]: # key error must bubble here feed = cache.ctxs[key] log.info(f'Reusing cached feed for {key}') From a7d3afc9b14d8b631a06e9d282d41583223024eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 16:00:14 -0400 Subject: [PATCH 08/28] Add a `maybe_open_feed()` which uses new broadcast chans Try out he new broadcast channels from `tractor` for data feeds we already have cached. Any time there's a cache hit we load the cached feed and just slap a broadcast receiver on it for the local consumer task. --- piker/_cacheables.py | 9 +++--- piker/data/feed.py | 76 +++++++++++++++++++++++++++++++------------- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index f9d42f0a..995efbd3 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -146,9 +146,8 @@ async def maybe_open_ctx( key: Hashable, mngr: AsyncContextManager[T], - loglevel: str, -) -> T: +) -> (bool, T): '''Maybe open a context manager if there is not already a cached version for the provided ``key``. Return the cached instance on a cache hit. @@ -161,7 +160,7 @@ async def maybe_open_ctx( log.info(f'Reusing cached feed for {key}') try: cache.users += 1 - yield feed + yield True, feed finally: cache.users -= 1 if cache.users == 0: @@ -170,7 +169,7 @@ async def maybe_open_ctx( try: with get_and_use() as feed: - yield feed + yield True, feed except KeyError: # lock feed acquisition around task racing / ``trio``'s # scheduler protocol @@ -193,7 +192,7 @@ async def maybe_open_ctx( cache.ctxs[key] = value cache.lock.release() try: - yield value + yield True, value finally: # don't tear down the feed until there are zero # users of it left. diff --git a/piker/data/feed.py b/piker/data/feed.py index ed24a095..c56c0720 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -31,11 +31,14 @@ from typing import ( ) import trio +from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor +from tractor import _broadcast from pydantic import BaseModel from ..brokers import get_brokermod +from .._cacheables import maybe_open_ctx from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -345,10 +348,10 @@ class Feed: memory buffer orchestration. """ name: str - stream: AsyncIterator[dict[str, Any]] shm: ShmArray mod: ModuleType first_quote: dict + stream: trio.abc.ReceiveChannel[dict[str, Any]] _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None @@ -362,7 +365,7 @@ class Feed: symbols: dict[str, Symbol] = field(default_factory=dict) async def receive(self) -> dict: - return await self.stream.__anext__() + return await self.stream.receive() @asynccontextmanager async def index_stream( @@ -376,8 +379,10 @@ class Feed: # a lone broker-daemon per provider should be # created for all practical purposes async with self._brokerd_portal.open_stream_from( + iter_ohlc_periods, delay_s=delay_s or self._max_sample_rate, + ) as self._index_stream: yield self._index_stream @@ -395,7 +400,7 @@ def sym_to_shm_key( @asynccontextmanager async def install_brokerd_search( - portal: tractor._portal.Portal, + portal: tractor.Portal, brokermod: ModuleType, ) -> None: @@ -434,34 +439,21 @@ async def open_feed( loglevel: Optional[str] = None, tick_throttle: Optional[float] = None, # Hz + shielded_stream: bool = False, -) -> AsyncIterator[dict[str, Any]]: +) -> ReceiveChannel[dict[str, Any]]: ''' Open a "data feed" which provides streamed real-time quotes. ''' sym = symbols[0].lower() - # TODO: feed cache locking, right now this is causing - # issues when reconnecting to a long running emsd? - # global _searcher_cache - - # async with _cache_lock: - # feed = _searcher_cache.get((brokername, sym)) - - # # if feed is not None and sym in feed.symbols: - # if feed is not None: - # yield feed - # # short circuit - # return - try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) # no feed for broker exists so maybe spawn a data brokerd - async with ( maybe_spawn_brokerd( @@ -480,21 +472,25 @@ async def open_feed( ) as (ctx, (init_msg, first_quote)), - ctx.open_stream() as stream, - ): + ctx.open_stream(shield=shielded_stream) as stream, + ): # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], readonly=True, ) + bstream = _broadcast.broadcast_receiver( + stream, + 2**10, + ) feed = Feed( name=brokername, - stream=stream, shm=shm, mod=mod, first_quote=first_quote, + stream=bstream, #brx_stream, _brokerd_portal=portal, ) ohlc_sample_rates = [] @@ -526,7 +522,43 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) try: - yield feed + yield feed, bstream finally: # drop the infinite stream connection await ctx.cancel() + + +@asynccontextmanager +async def maybe_open_feed( + + brokername: str, + symbols: Sequence[str], + loglevel: Optional[str] = None, + + tick_throttle: Optional[float] = None, # Hz + shielded_stream: bool = False, + +) -> ReceiveChannel[dict[str, Any]]: + '''Maybe open a data to a ``brokerd`` daemon only if there is no + local one for the broker-symbol pair, if one is cached use it wrapped + in a tractor broadcast receiver. + + ''' + sym = symbols[0].lower() + + async with maybe_open_ctx( + key=(brokername, sym), + mngr=open_feed( + brokername, + [sym], + loglevel=loglevel, + ), + ) as (cache_hit, (feed, stream)): + + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with stream.subscribe() as bstream: + yield feed, bstream + else: + yield feed, stream From 7d0f47364c341ece7f6fb0d3499e569dd9f20869 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 16:50:40 -0400 Subject: [PATCH 09/28] Use `maybe_open_feed()` in ems and fsp daemons --- piker/clearing/_client.py | 4 ++-- piker/clearing/_ems.py | 17 ++++++----------- piker/data/feed.py | 10 +++++----- piker/fsp/__init__.py | 23 +++++++++++++++-------- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 97869bb9..513dded8 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -38,7 +38,7 @@ log = get_logger(__name__) @dataclass class OrderBook: - """Buy-side (client-side ?) order book ctl and tracking. + '''EMS-client-side order book ctl and tracking. A style similar to "model-view" is used here where this api is provided as a supervised control for an EMS actor which does all the @@ -48,7 +48,7 @@ class OrderBook: Currently, this is mostly for keeping local state to match the EMS and use received events to trigger graphics updates. - """ + ''' # mem channels used to relay order requests to the EMS daemon _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 06536859..91197d60 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,9 +32,8 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, open_feed +from ..data.feed import Feed, maybe_open_feed from .._daemon import maybe_spawn_brokerd -from .._cacheables import maybe_open_ctx from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -959,15 +958,11 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_ctx( - key=(broker, symbol), - mngr=open_feed( - broker, - [symbol], - loglevel=loglevel, - ), + maybe_open_feed( + broker, + [symbol], loglevel=loglevel, - ) as feed, + ) as (feed, stream), ): # XXX: this should be initial price quote from target provider @@ -1011,7 +1006,7 @@ async def _emsd_main( brokerd_stream, ems_client_order_stream, - feed.stream, + stream, broker, symbol, book diff --git a/piker/data/feed.py b/piker/data/feed.py index c56c0720..07df13a1 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -441,7 +441,7 @@ async def open_feed( tick_throttle: Optional[float] = None, # Hz shielded_stream: bool = False, -) -> ReceiveChannel[dict[str, Any]]: +) -> Feed: ''' Open a "data feed" which provides streamed real-time quotes. @@ -522,7 +522,7 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) try: - yield feed, bstream + yield feed finally: # drop the infinite stream connection await ctx.cancel() @@ -538,7 +538,7 @@ async def maybe_open_feed( tick_throttle: Optional[float] = None, # Hz shielded_stream: bool = False, -) -> ReceiveChannel[dict[str, Any]]: +) -> (Feed, ReceiveChannel[dict[str, Any]]): '''Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped in a tractor broadcast receiver. @@ -553,12 +553,12 @@ async def maybe_open_feed( [sym], loglevel=loglevel, ), - ) as (cache_hit, (feed, stream)): + ) as (cache_hit, feed): if cache_hit: # add a new broadcast subscription for the quote stream # if this feed is likely already in use - async with stream.subscribe() as bstream: + async with feed.stream.subscribe() as bstream: yield feed, bstream else: yield feed, stream diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 312a0cef..3b5c359a 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -69,6 +69,7 @@ async def fsp_compute( ctx: tractor.Context, symbol: str, feed: Feed, + stream: trio.abc.ReceiveChannel, src: ShmArray, dst: ShmArray, @@ -93,14 +94,14 @@ async def fsp_compute( yield {} # task cancellation won't kill the channel - with stream.shield(): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes + # since we shielded at the `open_feed()` call + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes out_stream = func( - filter_by_sym(symbol, feed.stream), + filter_by_sym(symbol, stream), feed.shm, ) @@ -164,7 +165,8 @@ async def cascade( dst_shm_token: Tuple[str, np.dtype], symbol: str, fsp_func_name: str, -) -> AsyncIterator[dict]: + +) -> None: """Chain streaming signal processors and deliver output to destination mem buf. @@ -175,7 +177,11 @@ async def cascade( func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - async with data.open_feed(brokername, [symbol]) as feed: + async with data.feed.maybe_open_feed( + brokername, + [symbol], + shielded_stream=True, + ) as (feed, stream): assert src.token == feed.shm.token @@ -186,6 +192,7 @@ async def cascade( ctx=ctx, symbol=symbol, feed=feed, + stream=stream, src=src, dst=dst, From 2202abc9fb0b3a96b42855fe7a966f3265283e10 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 17:02:52 -0400 Subject: [PATCH 10/28] Add (lack of proper) ring buffer note --- piker/data/_sharedmem.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index aa2a5320..f7f9f904 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -NumPy compatible shared memory buffers for real-time FSP. +NumPy compatible shared memory buffers for real-time IPC streaming. """ from dataclasses import dataclass, asdict @@ -207,11 +207,16 @@ class ShmArray: def push( self, data: np.ndarray, + prepend: bool = False, + ) -> int: - """Ring buffer like "push" to append data + '''Ring buffer like "push" to append data into the buffer and return updated "last" index. - """ + + NB: no actual ring logic yet to give a "loop around" on overflow + condition, lel. + ''' length = len(data) if prepend: From 310d8f485e8c3b4807d4bce38255ede4f93bfced Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 16:58:10 -0400 Subject: [PATCH 11/28] Add disclaimer to old data mod --- piker/brokers/data.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 9369a73b..48b20d80 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -14,9 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Real-time data feed machinery -""" +''' +NB: this is the old original implementation that was used way way back +when the project started with ``kivy``. + +This code is left for reference but will likely be merged in +appropriately and removed. + +''' import time from functools import partial from dataclasses import dataclass, field From 954dc6a8b094df2a8f25a288df9548a3cc40d1aa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 12 Aug 2021 14:49:06 -0400 Subject: [PATCH 12/28] Fix missing cache hit bool element of return --- piker/_cacheables.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 995efbd3..979a7ce6 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -156,11 +156,11 @@ async def maybe_open_ctx( @contextmanager def get_and_use() -> AsyncIterable[T]: # key error must bubble here - feed = cache.ctxs[key] + value = cache.ctxs[key] log.info(f'Reusing cached feed for {key}') try: cache.users += 1 - yield True, feed + yield True, value finally: cache.users -= 1 if cache.users == 0: @@ -168,16 +168,16 @@ async def maybe_open_ctx( cache.no_more_users.set() try: - with get_and_use() as feed: - yield True, feed + with get_and_use() as value: + yield True, value except KeyError: # lock feed acquisition around task racing / ``trio``'s # scheduler protocol await cache.lock.acquire() try: - with get_and_use() as feed: + with get_and_use() as value: cache.lock.release() - yield feed + yield True, value return except KeyError: From 71b50fdae804b20aac1b2fb83fa28e2ca695413e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 12 Aug 2021 14:50:18 -0400 Subject: [PATCH 13/28] Use broadcast chan for order client and avoid chan repacking --- piker/clearing/_client.py | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 513dded8..89630722 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -25,6 +25,7 @@ from dataclasses import dataclass, field import trio import tractor +from tractor._broadcast import broadcast_receiver from ..data._source import Symbol from ..log import get_logger @@ -123,10 +124,15 @@ def get_orders( global _orders if _orders is None: + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + # setup local ui event streaming channels for request/resp # streamging with EMS daemon _orders = OrderBook( - *trio.open_memory_channel(100), + _to_ems=tx, + _from_order_book=brx, ) return _orders @@ -157,23 +163,12 @@ async def relay_order_cmds_from_sync_code( """ book = get_orders() - orders_stream = book._from_order_book - - async for cmd in orders_stream: - - print(cmd) - if cmd['symbol'] == symbol_key: - - # send msg over IPC / wire - log.info(f'Send order cmd:\n{pformat(cmd)}') - await to_ems_stream.send(cmd) - - else: - # XXX BRUTAL HACKZORZES !!! - # re-insert for another consumer - # we need broadcast channelz...asap - # https://github.com/goodboy/tractor/issues/204 - book._to_ems.send_nowait(cmd) + async with book._from_order_book.subscribe() as orders_stream: + async for cmd in orders_stream: + if cmd['symbol'] == symbol_key: + log.info(f'Send order cmd:\n{pformat(cmd)}') + # send msg over IPC / wire + await to_ems_stream.send(cmd) @asynccontextmanager From 0c9516051b7b725985430ade6a78bf0735487eef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 12 Aug 2021 20:16:45 -0400 Subject: [PATCH 14/28] TO SQUASH cached ctx. --- piker/_cacheables.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 979a7ce6..d4e6ccfa 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -160,7 +160,7 @@ async def maybe_open_ctx( log.info(f'Reusing cached feed for {key}') try: cache.users += 1 - yield True, value + yield value finally: cache.users -= 1 if cache.users == 0: From c8e320849a3b07af2fc59df36a9368bc9ca5c29b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Aug 2021 12:15:08 -0400 Subject: [PATCH 15/28] Add super basic support for data feed "pausing" --- piker/data/feed.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 07df13a1..715cf9d1 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -325,11 +325,23 @@ async def attach_feed_bus( else: sub = (stream, tick_throttle) - bus._subscribers[symbol].append(sub) + subs = bus._subscribers[symbol] + subs.append(sub) try: - await trio.sleep_forever() + async for msg in stream: + if msg == 'pause': + log.info( + f'Pausing {symbol}.{brokername} feed for {ctx.chan.uid}') + subs.remove(sub) + + elif msg == 'resume': + log.info( + f'Resuming {symbol}.{brokername} feed for {ctx.chan.uid}') + subs.append(sub) + else: + raise ValueError(msg) finally: log.info( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') From 2f5abaa47abfb9f42a26babe59965c5f26538a3f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Aug 2021 12:34:40 -0400 Subject: [PATCH 16/28] Add njs token bucket gist url --- piker/data/_sampling.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 490ae4b0..654404ef 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -235,6 +235,8 @@ async def sample_and_broadcast( try: if tick_throttle: + # this is a send mem chan that likely + # pushes to the ``uniform_rate_send()`` below. await stream.send(quote) else: @@ -255,6 +257,10 @@ async def sample_and_broadcast( subs.remove((stream, tick_throttle)) +# TODO: a less naive throttler, here's some snippets: +# token bucket by njs: +# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 + async def uniform_rate_send( rate: float, @@ -292,7 +298,7 @@ async def uniform_rate_send( rate = 1 / (now - last_send) last_send = now - # print(f'{rate} Hz sending quotes\n{first_quote}') + # print(f'{rate} Hz sending quotes') # \n{first_quote}') # TODO: now if only we could sync this to the display # rate timing exactly lul From 1e42f584781c9f44edcfa501d2b591ac3c1a1972 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 Aug 2021 18:14:09 -0400 Subject: [PATCH 17/28] Add pause/resume feed api, delegate to msg stream for broadcast api --- piker/data/feed.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 715cf9d1..cc9ea883 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -34,7 +34,7 @@ import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor -from tractor import _broadcast +# from tractor import _broadcast from pydantic import BaseModel from ..brokers import get_brokermod @@ -329,17 +329,22 @@ async def attach_feed_bus( subs.append(sub) try: + uid = ctx.chan.uid + fqsn = f'{symbol}.{brokername}' + async for msg in stream: if msg == 'pause': - log.info( - f'Pausing {symbol}.{brokername} feed for {ctx.chan.uid}') - subs.remove(sub) + if sub in subs: + log.info( + f'Pausing {fqsn} feed for {uid}') + subs.remove(sub) elif msg == 'resume': - log.info( - f'Resuming {symbol}.{brokername} feed for {ctx.chan.uid}') - subs.append(sub) + if sub not in subs: + log.info( + f'Resuming {fqsn} feed for {uid}') + subs.append(sub) else: raise ValueError(msg) finally: @@ -401,6 +406,12 @@ class Feed: else: yield self._index_stream + async def pause(self) -> None: + await self.stream.send('pause') + + async def resume(self) -> None: + await self.stream.send('resume') + def sym_to_shm_key( broker: str, @@ -493,16 +504,12 @@ async def open_feed( readonly=True, ) - bstream = _broadcast.broadcast_receiver( - stream, - 2**10, - ) feed = Feed( name=brokername, shm=shm, mod=mod, first_quote=first_quote, - stream=bstream, #brx_stream, + stream=stream, _brokerd_portal=portal, ) ohlc_sample_rates = [] @@ -573,4 +580,4 @@ async def maybe_open_feed( async with feed.stream.subscribe() as bstream: yield feed, bstream else: - yield feed, stream + yield feed, feed.stream From fe0d66e8479618b59aab9a89ef5c0358b5f370ef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Aug 2021 08:18:20 -0400 Subject: [PATCH 18/28] Drop removed module import --- piker/brokers/questrade.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 3cd57411..7a06ce76 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -47,7 +47,6 @@ from . import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log from . import get_brokermod -from . import api log = get_logger(__name__) From 2a9d24ccac0ee8146886b010a740d002db94ed5a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 2 Aug 2021 22:08:59 -0400 Subject: [PATCH 19/28] Remove dead OHLC index consumers from subs list on error --- piker/data/_sampling.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 654404ef..bf9ecbba 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -118,8 +118,9 @@ async def increment_ohlc_buffer( shm.push(last) # broadcast the buffer index step - # yield {'index': shm._last.value} - for ctx in _subscribers.get(delay_s, ()): + subs = _subscribers.get(delay_s, ()) + + for ctx in subs: try: await ctx.send_yield({'index': shm._last.value}) except ( @@ -127,6 +128,7 @@ async def increment_ohlc_buffer( trio.ClosedResourceError ): log.error(f'{ctx.chan.uid} dropped connection') + subs.remove(ctx) @tractor.stream From 2f1455d423df37d397048113d9bff735789492f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Aug 2021 14:46:13 -0400 Subject: [PATCH 20/28] Lol, don't use `maybe_open_feed()` for now, it's totally borked... --- piker/clearing/_ems.py | 10 +++++++--- piker/fsp/__init__.py | 8 ++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 91197d60..f556dd3a 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,7 +32,7 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, maybe_open_feed +from ..data.feed import Feed, maybe_open_feed, open_feed from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( @@ -958,13 +958,17 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_feed( + open_feed( + # maybe_open_feed( broker, [symbol], loglevel=loglevel, - ) as (feed, stream), + # ) as (feed, stream), + ) as feed, ): + stream = feed.stream + # XXX: this should be initial price quote from target provider first_quote = feed.first_quote diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 3b5c359a..60bc1ecb 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -177,11 +177,15 @@ async def cascade( func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - async with data.feed.maybe_open_feed( + # async with data.feed.maybe_open_feed( + async with data.feed.open_feed( brokername, [symbol], shielded_stream=True, - ) as (feed, stream): + # ) as (feed, stream): + ) as feed: + + stream = feed.stream assert src.token == feed.shm.token From ff322ae7be35fe6fc224e999a472c93f76f6bc79 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Aug 2021 17:39:53 -0400 Subject: [PATCH 21/28] Re-impl ctx-mng caching using `trio.Nursery.start()` Maybe i've finally learned my lesson that exit stacks and per task ctx manager caching is just not trionic.. Use the approach we've taken for the daemon service manager as well: create a process global nursery for each unique ctx manager we wish to cache and simply tear it down when the number of consumers goes to zero. This seems to resolve all prior issues and gets us error-free cached feeds! --- piker/_cacheables.py | 105 +++++++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 43 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d4e6ccfa..d86b477d 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -23,19 +23,20 @@ Cacheing apis and toolz. from collections import OrderedDict from typing import ( - Optional, + Any, Hashable, + Optional, TypeVar, AsyncContextManager, - AsyncIterable, ) from contextlib import ( asynccontextmanager, AsyncExitStack, - contextmanager, ) import trio +from trio_typing import TaskStatus +from tractor._portal import maybe_open_nursery from .brokers import get_brokermod from .log import get_logger @@ -132,14 +133,35 @@ async def open_cached_client( class cache: '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use data feed. + kept-alive-while-in-use async resource. ''' lock = trio.Lock() users: int = 0 - ctxs: dict[tuple[str, str], AsyncIterable] = {} + values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} + nurseries: dict[int, Optional[trio.Nursery]] = {} no_more_users: Optional[trio.Event] = None + @classmethod + async def run_ctx( + cls, + mng, + key, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + + cls.no_more_users = trio.Event() + cls.values[key] = value + task_status.started(value) + try: + await cls.no_more_users.wait() + finally: + value = cls.values.pop(key) + # discard nursery ref so it won't be re-used (an error) + cls.nurseries.pop(id(mng)) + @asynccontextmanager async def maybe_open_ctx( @@ -153,51 +175,48 @@ async def maybe_open_ctx( a cache hit. ''' - @contextmanager - def get_and_use() -> AsyncIterable[T]: - # key error must bubble here - value = cache.ctxs[key] - log.info(f'Reusing cached feed for {key}') - try: - cache.users += 1 - yield value - finally: - cache.users -= 1 - if cache.users == 0: - # signal to original allocator task feed use is complete - cache.no_more_users.set() - try: - with get_and_use() as value: - yield True, value - except KeyError: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - await cache.lock.acquire() + await cache.lock.acquire() + + ctx_key = id(mngr) + + # TODO: does this need to be a tractor "root nursery"? + async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: + cache.nurseries[ctx_key] = n + + value = None try: - with get_and_use() as value: - cache.lock.release() - yield True, value - return + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached feed for {key}') + cache.users += 1 + cache.lock.release() + yield True, value except KeyError: + log.info(f'Allocating new feed for {key}') + # **critical section** that should prevent other tasks from # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - cache.no_more_users = trio.Event() - log.info(f'Allocating new feed for {key}') - # TODO: eventually support N-brokers - async with mngr as value: - cache.ctxs[key] = value + value = await n.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): cache.lock.release() - try: - yield True, value - finally: - # don't tear down the feed until there are zero - # users of it left. - if cache.users > 0: - await cache.no_more_users.wait() - log.warning('De-allocating feed for {key}') - cache.ctxs.pop(key) + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.warning(f'De-allocating feed for {key}') + + # terminate mngr nursery + cache.no_more_users.set() From cae7f486e44135d6a66684d5dbad140e074e7121 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Aug 2021 17:55:10 -0400 Subject: [PATCH 22/28] Revert "Lol, don't use `maybe_open_feed()` for now, it's totally borked..." Think this was fixed by passing through `**kwargs` in `maybe_open_feed()`, the shielding for fsp respawns wasn't being properly passed through.. This reverts commit 2f1455d423df37d397048113d9bff735789492f7. --- piker/clearing/_ems.py | 10 +++------- piker/fsp/__init__.py | 8 ++------ 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index f556dd3a..91197d60 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,7 +32,7 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, maybe_open_feed, open_feed +from ..data.feed import Feed, maybe_open_feed from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( @@ -958,17 +958,13 @@ async def _emsd_main( # spawn one task per broker feed async with ( - open_feed( - # maybe_open_feed( + maybe_open_feed( broker, [symbol], loglevel=loglevel, - # ) as (feed, stream), - ) as feed, + ) as (feed, stream), ): - stream = feed.stream - # XXX: this should be initial price quote from target provider first_quote = feed.first_quote diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 60bc1ecb..3b5c359a 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -177,15 +177,11 @@ async def cascade( func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - # async with data.feed.maybe_open_feed( - async with data.feed.open_feed( + async with data.feed.maybe_open_feed( brokername, [symbol], shielded_stream=True, - # ) as (feed, stream): - ) as feed: - - stream = feed.stream + ) as (feed, stream): assert src.token == feed.shm.token From bbcce0cab66680c4240fbae533c4b9b12fa5b2cd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Aug 2021 18:04:19 -0400 Subject: [PATCH 23/28] Facepalm^2: pass through kwargs --- piker/data/feed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index cc9ea883..44f93a72 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -554,8 +554,7 @@ async def maybe_open_feed( symbols: Sequence[str], loglevel: Optional[str] = None, - tick_throttle: Optional[float] = None, # Hz - shielded_stream: bool = False, + **kwargs, ) -> (Feed, ReceiveChannel[dict[str, Any]]): '''Maybe open a data to a ``brokerd`` daemon only if there is no @@ -571,6 +570,7 @@ async def maybe_open_feed( brokername, [sym], loglevel=loglevel, + **kwargs, ), ) as (cache_hit, feed): From 1184a4d88e30094d99bd2ed536f72649b47510bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 09:28:22 -0400 Subject: [PATCH 24/28] Cache sample step streams per actor --- piker/data/feed.py | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 44f93a72..a519b395 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -355,6 +355,31 @@ async def attach_feed_bus( bus._subscribers[symbol].remove(sub) +@asynccontextmanager +async def open_sample_step_stream( + portal: tractor.Portal, + delay_s: int, + +) -> tractor.ReceiveMsgStream: + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + async with maybe_open_ctx( + key=delay_s, + mngr=portal.open_stream_from( + iter_ohlc_periods, + delay_s=delay_s, # must be kwarg + ), + ) as (cache_hit, istream): + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream + + @dataclass class Feed: """A data feed for client-side interaction with far-process# }}} @@ -371,7 +396,6 @@ class Feed: stream: trio.abc.ReceiveChannel[dict[str, Any]] _brokerd_portal: tractor._portal.Portal - _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 @@ -391,20 +415,13 @@ class Feed: ) -> AsyncIterator[int]: - if not self._index_stream: - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with self._brokerd_portal.open_stream_from( + delay_s = delay_s or self._max_sample_rate - iter_ohlc_periods, - delay_s=delay_s or self._max_sample_rate, - - ) as self._index_stream: - - yield self._index_stream - else: - yield self._index_stream + async with open_sample_step_stream( + self._brokerd_portal, + delay_s, + ) as istream: + yield istream async def pause(self) -> None: await self.stream.send('pause') From c3682348fe7ca3953391ddd0578be00d5bccb9d6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 12:46:47 -0400 Subject: [PATCH 25/28] Use the actor's service nursery instead In order to ensure the lifetime of the feed can in fact be kept open until the last consumer task has completed we need to maintain a lifetime which is hierarchically greater then all consumer tasks. This solution is somewhat hacky but seems to work well: we just use the `tractor` actor's "service nursery" (the one normally used to invoke rpc tasks) to launch the task which will start and keep open the target cached async context manager. To make this more "proper" we may want to offer a "root nursery" in all piker actors that is exposed through some singleton api or even introduce a public api for it into `tractor` directly. --- piker/_cacheables.py | 73 ++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d86b477d..47ffa6a4 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -36,6 +36,7 @@ from contextlib import ( import trio from trio_typing import TaskStatus +import tractor from tractor._portal import maybe_open_nursery from .brokers import get_brokermod @@ -180,43 +181,49 @@ async def maybe_open_ctx( ctx_key = id(mngr) - # TODO: does this need to be a tractor "root nursery"? - async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: - cache.nurseries[ctx_key] = n - value = None - try: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - value = cache.values[key] - log.info(f'Reusing cached feed for {key}') - cache.users += 1 - cache.lock.release() - yield True, value + value = None + try: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached feed for {key}') + cache.users += 1 + cache.lock.release() + yield True, value - except KeyError: - log.info(f'Allocating new feed for {key}') + except KeyError: + log.info(f'Allocating new feed for {key}') - # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. - value = await n.start(cache.run_ctx, mngr, key) - cache.users += 1 + # TODO: vaoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in + service_n = tractor.current_actor()._service_n + + # TODO: does this need to be a tractor "root nursery"? + ln = cache.nurseries.get(ctx_key) + assert not ln + ln = cache.nurseries[ctx_key] = service_n + + value = await ln.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): cache.lock.release() - yield False, value + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.warning(f'De-allocating feed for {key}') - finally: - cache.users -= 1 - - if cache.lock.locked(): - cache.lock.release() - - if value is not None: - # if no more consumers, teardown the client - if cache.users <= 0: - log.warning(f'De-allocating feed for {key}') - - # terminate mngr nursery - cache.no_more_users.set() + # terminate mngr nursery + cache.no_more_users.set() From 2df16e11edd3031a5378eec2ded29dabb3e135eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Sep 2021 09:01:25 -0400 Subject: [PATCH 26/28] Re-implement client caching using `maybe_open_ctx` --- piker/_cacheables.py | 70 +++++++++++--------------------------------- 1 file changed, 17 insertions(+), 53 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 47ffa6a4..d15e7c45 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -79,59 +79,6 @@ def async_lifo_cache(maxsize=128): _cache: dict[str, 'Client'] = {} # noqa -# XXX: this mis mostly an alt-implementation of -# maybe_open_ctx() below except it uses an async exit statck. -# ideally wer pick one or the other. -@asynccontextmanager -async def open_cached_client( - brokername: str, -) -> 'Client': # noqa - '''Get a cached broker client from the current actor's local vars. - - If one has not been setup do it and cache it. - ''' - global _cache - - clients = _cache.setdefault('clients', {'_lock': trio.Lock()}) - - # global cache task lock - lock = clients['_lock'] - - client = None - - try: - log.info(f"Loading existing `{brokername}` client") - - async with lock: - client = clients[brokername] - client._consumers += 1 - - yield client - - except KeyError: - log.info(f"Creating new client for broker {brokername}") - - async with lock: - brokermod = get_brokermod(brokername) - exit_stack = AsyncExitStack() - - client = await exit_stack.enter_async_context( - brokermod.get_client() - ) - client._consumers = 0 - client._exit_stack = exit_stack - clients[brokername] = client - - yield client - - finally: - if client is not None: - # if no more consumers, teardown the client - client._consumers -= 1 - if client._consumers <= 0: - await client._exit_stack.aclose() - - class cache: '''Globally (processs wide) cached, task access to a kept-alive-while-in-use async resource. @@ -227,3 +174,20 @@ async def maybe_open_ctx( # terminate mngr nursery cache.no_more_users.set() + + +@asynccontextmanager +async def open_cached_client( + brokername: str, +) -> 'Client': # noqa + '''Get a cached broker client from the current actor's local vars. + + If one has not been setup do it and cache it. + + ''' + brokermod = get_brokermod(brokername) + async with maybe_open_ctx( + key=brokername, + mngr=brokermod.get_client(), + ) as (cache_hit, client): + yield client From 26cb7aa660e62588fa6c29ef38d22523fa4ef091 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Sep 2021 08:26:26 -0400 Subject: [PATCH 27/28] Drop tractor stream shielding use --- piker/data/feed.py | 3 +-- piker/fsp/__init__.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index a519b395..72d3c50d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -479,7 +479,6 @@ async def open_feed( loglevel: Optional[str] = None, tick_throttle: Optional[float] = None, # Hz - shielded_stream: bool = False, ) -> Feed: ''' @@ -512,7 +511,7 @@ async def open_feed( ) as (ctx, (init_msg, first_quote)), - ctx.open_stream(shield=shielded_stream) as stream, + ctx.open_stream() as stream, ): # we can only read from shm diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 3b5c359a..a6324bb6 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -180,7 +180,6 @@ async def cascade( async with data.feed.maybe_open_feed( brokername, [symbol], - shielded_stream=True, ) as (feed, stream): assert src.token == feed.shm.token From 4527d4a67779edfb1c718331868fb7aef8fa5ae1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Sep 2021 09:45:14 -0400 Subject: [PATCH 28/28] Allocate an event per context --- piker/_cacheables.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d15e7c45..87b700f5 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -31,13 +31,11 @@ from typing import ( ) from contextlib import ( asynccontextmanager, - AsyncExitStack, ) import trio from trio_typing import TaskStatus import tractor -from tractor._portal import maybe_open_nursery from .brokers import get_brokermod from .log import get_logger @@ -86,8 +84,11 @@ class cache: ''' lock = trio.Lock() users: int = 0 - values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} - nurseries: dict[int, Optional[trio.Nursery]] = {} + values: dict[Any, Any] = {} + resources: dict[ + int, + Optional[tuple[trio.Nursery, trio.Event]] + ] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -100,15 +101,15 @@ class cache: ) -> None: async with mng as value: - cls.no_more_users = trio.Event() + _, no_more_users = cls.resources[id(mng)] cls.values[key] = value task_status.started(value) try: - await cls.no_more_users.wait() + await no_more_users.wait() finally: value = cls.values.pop(key) # discard nursery ref so it won't be re-used (an error) - cls.nurseries.pop(id(mng)) + cls.resources.pop(id(mng)) @asynccontextmanager @@ -128,13 +129,12 @@ async def maybe_open_ctx( ctx_key = id(mngr) - value = None try: # lock feed acquisition around task racing / ``trio``'s # scheduler protocol value = cache.values[key] - log.info(f'Reusing cached feed for {key}') + log.info(f'Reusing cached resource for {key}') cache.users += 1 cache.lock.release() yield True, value @@ -146,14 +146,15 @@ async def maybe_open_ctx( # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - # TODO: vaoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in + # TODO: avoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in piker actors? service_n = tractor.current_actor()._service_n # TODO: does this need to be a tractor "root nursery"? - ln = cache.nurseries.get(ctx_key) + ln = cache.resources.get(ctx_key) assert not ln - ln = cache.nurseries[ctx_key] = service_n + + ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) value = await ln.start(cache.run_ctx, mngr, key) cache.users += 1 @@ -170,10 +171,11 @@ async def maybe_open_ctx( if value is not None: # if no more consumers, teardown the client if cache.users <= 0: - log.warning(f'De-allocating feed for {key}') + log.warning(f'De-allocating resource for {key}') # terminate mngr nursery - cache.no_more_users.set() + _, no_more_users = cache.resources[ctx_key] + no_more_users.set() @asynccontextmanager