Cache `brokerd` feeds for reuse in clearing loop

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-09 11:31:38 -04:00
parent f03f051e7f
commit 146c684f21
1 changed files with 80 additions and 22 deletions

View File

@ -22,7 +22,7 @@ from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pprint import pformat
import time
from typing import AsyncIterator, Callable, Any
from typing import AsyncIterator, Callable, Optional
from bidict import bidict
from pydantic import BaseModel
@ -123,7 +123,7 @@ class _DarkBook:
# XXX: this is in place to prevent accidental positions that are too
# big. Now obviously this won't make sense for crypto like BTC, but
# for most traditional brokers it should be fine unless you start
# slinging NQ futes or something.
# slinging NQ futes or something; check ur margin.
_DEFAULT_SIZE: float = 1.0
@ -266,7 +266,7 @@ class TradesRelay:
consumers: int = 0
class _Router(BaseModel):
class Router(BaseModel):
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
@ -276,8 +276,6 @@ class _Router(BaseModel):
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
@ -343,12 +341,12 @@ class _Router(BaseModel):
relay.consumers -= 1
_router: _Router = None
_router: Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
router: Router,
feed: Feed,
symbol: str,
_exec_mode: str,
@ -466,7 +464,7 @@ async def _setup_persistent_emsd(
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
_router = Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
@ -480,7 +478,7 @@ async def translate_and_relay_brokerd_events(
broker: str,
brokerd_trades_stream: tractor.MsgStream,
router: _Router,
router: Router,
) -> AsyncIterator[dict]:
'''Trades update loop - receive updates from ``brokerd`` trades
@ -704,7 +702,7 @@ async def process_client_order_cmds(
symbol: str,
feed: Feed, # noqa
dark_book: _DarkBook,
router: _Router,
router: Router,
) -> None:
@ -904,6 +902,73 @@ async def process_client_order_cmds(
)
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use data feed.
'''
lock = trio.Lock()
users: int = 0
feeds: dict[tuple[str, str], Feed] = {}
no_more_users: Optional[trio.Event] = None
@asynccontextmanager
async def maybe_open_clearing_feed(
broker: str,
symbol: str,
loglevel: str,
) -> Feed:
try:
log.info(f'Reusing existing feed for {(broker, symbol)}')
yield cache.feeds[(broker, symbol)]
except KeyError:
# lock feed acquisition around task racing / ``trio``'s scheduler protocol
await cache.lock.acquire()
try:
cache.users += 1
cached_feed = cache.feeds[(broker, symbol)]
cache.lock.release()
try:
yield cached_feed
finally:
cache.users -= 1
if cache.users == 0:
# signal to original allocator task feed use is complete
cache.no_more_users.set()
return
except KeyError:
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
cache.no_more_users = trio.Event()
log.warning(f'Creating new feed for {(broker, symbol)}')
# TODO: eventually support N-brokers
async with (
data.open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
):
cache.feeds[(broker, symbol)] = feed
cache.lock.release()
try:
yield feed
finally:
# don't tear down the feed until there are zero
# users of it left.
if cache.users > 0:
await cache.no_more_users.wait()
cache.feeds.pop((broker, symbol))
@tractor.context
async def _emsd_main(
@ -958,32 +1023,25 @@ async def _emsd_main(
# tractor.Context instead of strictly requiring a ctx arg.
ems_ctx = ctx
cached_feed = _router.feeds.get((broker, symbol))
if cached_feed:
# TODO: use cached feeds per calling-actor
log.warning(f'Opening duplicate feed for {(broker, symbol)}')
feed: Feed
# spawn one task per broker feed
async with (
# TODO: eventually support N-brokers
data.open_feed(
maybe_open_clearing_feed(
broker,
[symbol],
symbol,
loglevel=loglevel,
) as feed,
):
if not cached_feed:
_router.feeds[(broker, symbol)] = feed
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quote
# open a stream with the brokerd backend for order
# flow dialogue
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# open a stream with the brokerd backend for order
# flow dialogue
async with (
# only open if one isn't already up: we try to keep