Let's abstractify: ->

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-09 19:27:42 -04:00
parent 68ce5b3550
commit 66f1d91541
2 changed files with 30 additions and 30 deletions

View File

@ -19,7 +19,7 @@ Cacheing apis and toolz.
""" """
from collections import OrderedDict from collections import OrderedDict
from typing import Optional from typing import Optional, Hashable, TypeVar, AsyncContextManager
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
AsyncExitStack, AsyncExitStack,
@ -30,10 +30,10 @@ import trio
from .brokers import get_brokermod from .brokers import get_brokermod
from .log import get_logger from .log import get_logger
from . import data
from .data.feed import Feed from .data.feed import Feed
T = TypeVar('T')
log = get_logger(__name__) log = get_logger(__name__)
@ -69,16 +69,17 @@ def async_lifo_cache(maxsize=128):
_cache: dict[str, 'Client'] = {} # noqa _cache: dict[str, 'Client'] = {} # noqa
# XXX: this mis mostly an alt-implementation of
# maybe_open_ctx() below except it uses an async exit statck.
# ideally wer pick one or the other.
@asynccontextmanager @asynccontextmanager
async def open_cached_client( async def open_cached_client(
brokername: str, brokername: str,
*args,
**kwargs,
) -> 'Client': # noqa ) -> 'Client': # noqa
"""Get a cached broker client from the current actor's local vars. '''Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it. If one has not been setup do it and cache it.
""" '''
global _cache global _cache
clients = _cache.setdefault('clients', {'_lock': trio.Lock()}) clients = _cache.setdefault('clients', {'_lock': trio.Lock()})
@ -128,25 +129,27 @@ class cache:
''' '''
lock = trio.Lock() lock = trio.Lock()
users: int = 0 users: int = 0
feeds: dict[tuple[str, str], Feed] = {} ctxs: dict[tuple[str, str], Feed] = {}
no_more_users: Optional[trio.Event] = None no_more_users: Optional[trio.Event] = None
@asynccontextmanager @asynccontextmanager
async def maybe_open_feed( async def maybe_open_ctx(
broker: str, key: Hashable,
symbol: str, mngr: AsyncContextManager[T],
loglevel: str, loglevel: str,
) -> Feed: ) -> T:
'''Maybe open a context manager if there is not already a cached
key = (broker, symbol) version for the provided ``key``. Return the cached instance on
a cache hit.
'''
@contextmanager @contextmanager
def get_and_use() -> Feed: def get_and_use() -> Feed:
# key error must bubble here # key error must bubble here
feed = cache.feeds[key] feed = cache.ctxs[key]
log.info(f'Reusing cached feed for {key}') log.info(f'Reusing cached feed for {key}')
try: try:
cache.users += 1 cache.users += 1
@ -174,22 +177,15 @@ async def maybe_open_feed(
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler # checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed. # may switch and by accident we create more then one feed.
cache.no_more_users = trio.Event() cache.no_more_users = trio.Event()
log.info(f'Allocating new feed for {key}') log.info(f'Allocating new feed for {key}')
# TODO: eventually support N-brokers # TODO: eventually support N-brokers
async with ( async with mngr as value:
data.open_feed( cache.ctxs[key] = value
broker,
[symbol],
loglevel=loglevel,
) as feed,
):
cache.feeds[key] = feed
cache.lock.release() cache.lock.release()
try: try:
yield feed yield value
finally: finally:
# don't tear down the feed until there are zero # don't tear down the feed until there are zero
# users of it left. # users of it left.
@ -197,4 +193,4 @@ async def maybe_open_feed(
await cache.no_more_users.wait() await cache.no_more_users.wait()
log.warning('De-allocating feed for {key}') log.warning('De-allocating feed for {key}')
cache.feeds.pop(key) cache.ctxs.pop(key)

View File

@ -32,9 +32,9 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed from ..data.feed import Feed, open_feed
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from .._cacheables import maybe_open_feed from .._cacheables import maybe_open_ctx
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Status, Order,
@ -959,9 +959,13 @@ async def _emsd_main(
# spawn one task per broker feed # spawn one task per broker feed
async with ( async with (
maybe_open_feed( maybe_open_ctx(
key=(broker, symbol),
mngr=open_feed(
broker, broker,
symbol, [symbol],
loglevel=loglevel,
),
loglevel=loglevel, loglevel=loglevel,
) as feed, ) as feed,
): ):