Move feed cacheing to cache mod; put entry retreival into ctx mng

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-09 13:20:57 -04:00
parent a0660e553f
commit 0ce8057823
2 changed files with 91 additions and 76 deletions

View File

@ -18,19 +18,25 @@
Cacheing apis and toolz.
"""
from typing import Dict
from contextlib import asynccontextmanager, AsyncExitStack
from typing import Optional
from contextlib import (
asynccontextmanager,
AsyncExitStack,
contextmanager,
)
import trio
from .brokers import get_brokermod
from .log import get_logger
from . import data
from .data.feed import Feed
log = get_logger(__name__)
_cache: Dict[str, 'Client'] = {} # noqa
_cache: dict[str, 'Client'] = {} # noqa
@asynccontextmanager
@ -83,3 +89,82 @@ async def open_cached_client(
client._consumers -= 1
if client._consumers <= 0:
await client._exit_stack.aclose()
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use data feed.
'''
lock = trio.Lock()
users: int = 0
feeds: dict[tuple[str, str], Feed] = {}
no_more_users: Optional[trio.Event] = None
@asynccontextmanager
async def maybe_open_feed(
broker: str,
symbol: str,
loglevel: str,
) -> Feed:
key = (broker, symbol)
@contextmanager
def get_and_use() -> Feed:
# key error must bubble here
feed = cache.feeds[key]
log.info(f'Reusing cached feed for {key}')
try:
cache.users += 1
yield feed
finally:
cache.users -= 1
if cache.users == 0:
# signal to original allocator task feed use is complete
cache.no_more_users.set()
try:
with get_and_use() as feed:
yield feed
except KeyError:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
await cache.lock.acquire()
try:
with get_and_use() as feed:
cache.lock.release()
yield feed
return
except KeyError:
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
cache.no_more_users = trio.Event()
log.info(f'Allocating new feed for {key}')
# TODO: eventually support N-brokers
async with (
data.open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
):
cache.feeds[key] = feed
cache.lock.release()
try:
yield feed
finally:
# don't tear down the feed until there are zero
# users of it left.
if cache.users > 0:
await cache.no_more_users.wait()
log.warning('De-allocating feed for {key}')
cache.feeds.pop(key)

View File

@ -22,7 +22,7 @@ from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pprint import pformat
import time
from typing import AsyncIterator, Callable, Optional
from typing import AsyncIterator, Callable
from bidict import bidict
from pydantic import BaseModel
@ -30,11 +30,11 @@ import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from .._cacheables import maybe_open_feed
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -132,7 +132,6 @@ async def clear_dark_triggers(
brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
symbol: str,
@ -902,73 +901,6 @@ async def process_client_order_cmds(
)
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use data feed.
'''
lock = trio.Lock()
users: int = 0
feeds: dict[tuple[str, str], Feed] = {}
no_more_users: Optional[trio.Event] = None
@asynccontextmanager
async def maybe_open_clearing_feed(
broker: str,
symbol: str,
loglevel: str,
) -> Feed:
try:
log.info(f'Reusing existing feed for {(broker, symbol)}')
yield cache.feeds[(broker, symbol)]
except KeyError:
# lock feed acquisition around task racing / ``trio``'s scheduler protocol
await cache.lock.acquire()
try:
cache.users += 1
cached_feed = cache.feeds[(broker, symbol)]
cache.lock.release()
try:
yield cached_feed
finally:
cache.users -= 1
if cache.users == 0:
# signal to original allocator task feed use is complete
cache.no_more_users.set()
return
except KeyError:
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
cache.no_more_users = trio.Event()
log.warning(f'Creating new feed for {(broker, symbol)}')
# TODO: eventually support N-brokers
async with (
data.open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
):
cache.feeds[(broker, symbol)] = feed
cache.lock.release()
try:
yield feed
finally:
# don't tear down the feed until there are zero
# users of it left.
if cache.users > 0:
await cache.no_more_users.wait()
cache.feeds.pop((broker, symbol))
@tractor.context
async def _emsd_main(
@ -1027,7 +959,7 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
maybe_open_clearing_feed(
maybe_open_feed(
broker,
symbol,
loglevel=loglevel,
@ -1073,11 +1005,9 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
# relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream,
feed.stream,
broker,
symbol,
book