From 9b2b40598d82d37e45a5600821353ee44b199bbb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 11:31:38 -0400 Subject: [PATCH] Cache `brokerd` feeds for reuse in clearing loop --- piker/clearing/_ems.py | 102 ++++++++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 7dbbfbad..cd6985f3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -22,7 +22,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from pprint import pformat import time -from typing import AsyncIterator, Callable, Any +from typing import AsyncIterator, Callable, Optional from bidict import bidict from pydantic import BaseModel @@ -123,7 +123,7 @@ class _DarkBook: # XXX: this is in place to prevent accidental positions that are too # big. Now obviously this won't make sense for crypto like BTC, but # for most traditional brokers it should be fine unless you start -# slinging NQ futes or something. +# slinging NQ futes or something; check ur margin. _DEFAULT_SIZE: float = 1.0 @@ -266,7 +266,7 @@ class TradesRelay: consumers: int = 0 -class _Router(BaseModel): +class Router(BaseModel): '''Order router which manages and tracks per-broker dark book, alerts, clearing and related data feed management. @@ -276,8 +276,6 @@ class _Router(BaseModel): # setup at actor spawn time nursery: trio.Nursery - feeds: dict[tuple[str, str], Any] = {} - # broker to book map books: dict[str, _DarkBook] = {} @@ -343,12 +341,12 @@ class _Router(BaseModel): relay.consumers -= 1 -_router: _Router = None +_router: Router = None async def open_brokerd_trades_dialogue( - router: _Router, + router: Router, feed: Feed, symbol: str, _exec_mode: str, @@ -466,7 +464,7 @@ async def _setup_persistent_emsd( # open a root "service nursery" for the ``emsd`` actor async with trio.open_nursery() as service_nursery: - _router = _Router(nursery=service_nursery) + _router = Router(nursery=service_nursery) # TODO: send back the full set of persistent # orders/execs? @@ -480,7 +478,7 @@ async def translate_and_relay_brokerd_events( broker: str, brokerd_trades_stream: tractor.MsgStream, - router: _Router, + router: Router, ) -> AsyncIterator[dict]: '''Trades update loop - receive updates from ``brokerd`` trades @@ -704,7 +702,7 @@ async def process_client_order_cmds( symbol: str, feed: Feed, # noqa dark_book: _DarkBook, - router: _Router, + router: Router, ) -> None: @@ -904,6 +902,73 @@ async def process_client_order_cmds( ) +class cache: + '''Globally (processs wide) cached, task access to a + kept-alive-while-in-use data feed. + + ''' + lock = trio.Lock() + users: int = 0 + feeds: dict[tuple[str, str], Feed] = {} + no_more_users: Optional[trio.Event] = None + + +@asynccontextmanager +async def maybe_open_clearing_feed( + + broker: str, + symbol: str, + loglevel: str, + +) -> Feed: + try: + log.info(f'Reusing existing feed for {(broker, symbol)}') + yield cache.feeds[(broker, symbol)] + except KeyError: + # lock feed acquisition around task racing / ``trio``'s scheduler protocol + await cache.lock.acquire() + try: + cache.users += 1 + cached_feed = cache.feeds[(broker, symbol)] + cache.lock.release() + try: + yield cached_feed + finally: + cache.users -= 1 + if cache.users == 0: + # signal to original allocator task feed use is complete + cache.no_more_users.set() + return + + except KeyError: + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + cache.no_more_users = trio.Event() + + log.warning(f'Creating new feed for {(broker, symbol)}') + # TODO: eventually support N-brokers + async with ( + data.open_feed( + broker, + [symbol], + loglevel=loglevel, + ) as feed, + ): + cache.feeds[(broker, symbol)] = feed + cache.lock.release() + try: + yield feed + finally: + # don't tear down the feed until there are zero + # users of it left. + if cache.users > 0: + await cache.no_more_users.wait() + + cache.feeds.pop((broker, symbol)) + + @tractor.context async def _emsd_main( @@ -958,32 +1023,25 @@ async def _emsd_main( # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx - cached_feed = _router.feeds.get((broker, symbol)) - if cached_feed: - # TODO: use cached feeds per calling-actor - log.warning(f'Opening duplicate feed for {(broker, symbol)}') + feed: Feed # spawn one task per broker feed async with ( - # TODO: eventually support N-brokers - data.open_feed( + maybe_open_clearing_feed( broker, - [symbol], + symbol, loglevel=loglevel, ) as feed, ): - if not cached_feed: - _router.feeds[(broker, symbol)] = feed # XXX: this should be initial price quote from target provider first_quote = feed.first_quote - # open a stream with the brokerd backend for order - # flow dialogue - book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + # open a stream with the brokerd backend for order + # flow dialogue async with ( # only open if one isn't already up: we try to keep