Cache `brokerd` feeds for reuse in clearing loop
parent
68d2000909
commit
9b2b40598d
|
@ -22,7 +22,7 @@ from contextlib import asynccontextmanager
|
|||
from dataclasses import dataclass, field
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import AsyncIterator, Callable, Any
|
||||
from typing import AsyncIterator, Callable, Optional
|
||||
|
||||
from bidict import bidict
|
||||
from pydantic import BaseModel
|
||||
|
@ -123,7 +123,7 @@ class _DarkBook:
|
|||
# XXX: this is in place to prevent accidental positions that are too
|
||||
# big. Now obviously this won't make sense for crypto like BTC, but
|
||||
# for most traditional brokers it should be fine unless you start
|
||||
# slinging NQ futes or something.
|
||||
# slinging NQ futes or something; check ur margin.
|
||||
_DEFAULT_SIZE: float = 1.0
|
||||
|
||||
|
||||
|
@ -266,7 +266,7 @@ class TradesRelay:
|
|||
consumers: int = 0
|
||||
|
||||
|
||||
class _Router(BaseModel):
|
||||
class Router(BaseModel):
|
||||
'''Order router which manages and tracks per-broker dark book,
|
||||
alerts, clearing and related data feed management.
|
||||
|
||||
|
@ -276,8 +276,6 @@ class _Router(BaseModel):
|
|||
# setup at actor spawn time
|
||||
nursery: trio.Nursery
|
||||
|
||||
feeds: dict[tuple[str, str], Any] = {}
|
||||
|
||||
# broker to book map
|
||||
books: dict[str, _DarkBook] = {}
|
||||
|
||||
|
@ -343,12 +341,12 @@ class _Router(BaseModel):
|
|||
relay.consumers -= 1
|
||||
|
||||
|
||||
_router: _Router = None
|
||||
_router: Router = None
|
||||
|
||||
|
||||
async def open_brokerd_trades_dialogue(
|
||||
|
||||
router: _Router,
|
||||
router: Router,
|
||||
feed: Feed,
|
||||
symbol: str,
|
||||
_exec_mode: str,
|
||||
|
@ -466,7 +464,7 @@ async def _setup_persistent_emsd(
|
|||
# open a root "service nursery" for the ``emsd`` actor
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
|
||||
_router = _Router(nursery=service_nursery)
|
||||
_router = Router(nursery=service_nursery)
|
||||
|
||||
# TODO: send back the full set of persistent
|
||||
# orders/execs?
|
||||
|
@ -480,7 +478,7 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
broker: str,
|
||||
brokerd_trades_stream: tractor.MsgStream,
|
||||
router: _Router,
|
||||
router: Router,
|
||||
|
||||
) -> AsyncIterator[dict]:
|
||||
'''Trades update loop - receive updates from ``brokerd`` trades
|
||||
|
@ -704,7 +702,7 @@ async def process_client_order_cmds(
|
|||
symbol: str,
|
||||
feed: Feed, # noqa
|
||||
dark_book: _DarkBook,
|
||||
router: _Router,
|
||||
router: Router,
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -904,6 +902,73 @@ async def process_client_order_cmds(
|
|||
)
|
||||
|
||||
|
||||
class cache:
|
||||
'''Globally (processs wide) cached, task access to a
|
||||
kept-alive-while-in-use data feed.
|
||||
|
||||
'''
|
||||
lock = trio.Lock()
|
||||
users: int = 0
|
||||
feeds: dict[tuple[str, str], Feed] = {}
|
||||
no_more_users: Optional[trio.Event] = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_clearing_feed(
|
||||
|
||||
broker: str,
|
||||
symbol: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> Feed:
|
||||
try:
|
||||
log.info(f'Reusing existing feed for {(broker, symbol)}')
|
||||
yield cache.feeds[(broker, symbol)]
|
||||
except KeyError:
|
||||
# lock feed acquisition around task racing / ``trio``'s scheduler protocol
|
||||
await cache.lock.acquire()
|
||||
try:
|
||||
cache.users += 1
|
||||
cached_feed = cache.feeds[(broker, symbol)]
|
||||
cache.lock.release()
|
||||
try:
|
||||
yield cached_feed
|
||||
finally:
|
||||
cache.users -= 1
|
||||
if cache.users == 0:
|
||||
# signal to original allocator task feed use is complete
|
||||
cache.no_more_users.set()
|
||||
return
|
||||
|
||||
except KeyError:
|
||||
# **critical section** that should prevent other tasks from
|
||||
# checking the cache until complete otherwise the scheduler
|
||||
# may switch and by accident we create more then one feed.
|
||||
|
||||
cache.no_more_users = trio.Event()
|
||||
|
||||
log.warning(f'Creating new feed for {(broker, symbol)}')
|
||||
# TODO: eventually support N-brokers
|
||||
async with (
|
||||
data.open_feed(
|
||||
broker,
|
||||
[symbol],
|
||||
loglevel=loglevel,
|
||||
) as feed,
|
||||
):
|
||||
cache.feeds[(broker, symbol)] = feed
|
||||
cache.lock.release()
|
||||
try:
|
||||
yield feed
|
||||
finally:
|
||||
# don't tear down the feed until there are zero
|
||||
# users of it left.
|
||||
if cache.users > 0:
|
||||
await cache.no_more_users.wait()
|
||||
|
||||
cache.feeds.pop((broker, symbol))
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _emsd_main(
|
||||
|
||||
|
@ -958,32 +1023,25 @@ async def _emsd_main(
|
|||
# tractor.Context instead of strictly requiring a ctx arg.
|
||||
ems_ctx = ctx
|
||||
|
||||
cached_feed = _router.feeds.get((broker, symbol))
|
||||
if cached_feed:
|
||||
# TODO: use cached feeds per calling-actor
|
||||
log.warning(f'Opening duplicate feed for {(broker, symbol)}')
|
||||
feed: Feed
|
||||
|
||||
# spawn one task per broker feed
|
||||
async with (
|
||||
# TODO: eventually support N-brokers
|
||||
data.open_feed(
|
||||
maybe_open_clearing_feed(
|
||||
broker,
|
||||
[symbol],
|
||||
symbol,
|
||||
loglevel=loglevel,
|
||||
) as feed,
|
||||
):
|
||||
if not cached_feed:
|
||||
_router.feeds[(broker, symbol)] = feed
|
||||
|
||||
# XXX: this should be initial price quote from target provider
|
||||
first_quote = feed.first_quote
|
||||
|
||||
# open a stream with the brokerd backend for order
|
||||
# flow dialogue
|
||||
|
||||
book = _router.get_dark_book(broker)
|
||||
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
|
||||
|
||||
# open a stream with the brokerd backend for order
|
||||
# flow dialogue
|
||||
async with (
|
||||
|
||||
# only open if one isn't already up: we try to keep
|
||||
|
|
Loading…
Reference in New Issue