Move feed cacheing to cache mod; put entry retreival into ctx mng
parent
79000b93cb
commit
b535effc52
|
@ -18,19 +18,25 @@
|
|||
Cacheing apis and toolz.
|
||||
|
||||
"""
|
||||
from typing import Dict
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
from typing import Optional
|
||||
from contextlib import (
|
||||
asynccontextmanager,
|
||||
AsyncExitStack,
|
||||
contextmanager,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
||||
from .brokers import get_brokermod
|
||||
from .log import get_logger
|
||||
from . import data
|
||||
from .data.feed import Feed
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_cache: Dict[str, 'Client'] = {} # noqa
|
||||
_cache: dict[str, 'Client'] = {} # noqa
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -83,3 +89,82 @@ async def open_cached_client(
|
|||
client._consumers -= 1
|
||||
if client._consumers <= 0:
|
||||
await client._exit_stack.aclose()
|
||||
|
||||
|
||||
class cache:
|
||||
'''Globally (processs wide) cached, task access to a
|
||||
kept-alive-while-in-use data feed.
|
||||
|
||||
'''
|
||||
lock = trio.Lock()
|
||||
users: int = 0
|
||||
feeds: dict[tuple[str, str], Feed] = {}
|
||||
no_more_users: Optional[trio.Event] = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_feed(
|
||||
|
||||
broker: str,
|
||||
symbol: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> Feed:
|
||||
|
||||
key = (broker, symbol)
|
||||
|
||||
@contextmanager
|
||||
def get_and_use() -> Feed:
|
||||
# key error must bubble here
|
||||
feed = cache.feeds[key]
|
||||
log.info(f'Reusing cached feed for {key}')
|
||||
try:
|
||||
cache.users += 1
|
||||
yield feed
|
||||
finally:
|
||||
cache.users -= 1
|
||||
if cache.users == 0:
|
||||
# signal to original allocator task feed use is complete
|
||||
cache.no_more_users.set()
|
||||
|
||||
try:
|
||||
with get_and_use() as feed:
|
||||
yield feed
|
||||
except KeyError:
|
||||
# lock feed acquisition around task racing / ``trio``'s
|
||||
# scheduler protocol
|
||||
await cache.lock.acquire()
|
||||
try:
|
||||
with get_and_use() as feed:
|
||||
cache.lock.release()
|
||||
yield feed
|
||||
return
|
||||
|
||||
except KeyError:
|
||||
# **critical section** that should prevent other tasks from
|
||||
# checking the cache until complete otherwise the scheduler
|
||||
# may switch and by accident we create more then one feed.
|
||||
|
||||
cache.no_more_users = trio.Event()
|
||||
|
||||
log.info(f'Allocating new feed for {key}')
|
||||
# TODO: eventually support N-brokers
|
||||
async with (
|
||||
data.open_feed(
|
||||
broker,
|
||||
[symbol],
|
||||
loglevel=loglevel,
|
||||
) as feed,
|
||||
):
|
||||
cache.feeds[key] = feed
|
||||
cache.lock.release()
|
||||
try:
|
||||
yield feed
|
||||
finally:
|
||||
# don't tear down the feed until there are zero
|
||||
# users of it left.
|
||||
if cache.users > 0:
|
||||
await cache.no_more_users.wait()
|
||||
|
||||
log.warning('De-allocating feed for {key}')
|
||||
cache.feeds.pop(key)
|
||||
|
|
|
@ -22,7 +22,7 @@ from contextlib import asynccontextmanager
|
|||
from dataclasses import dataclass, field
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import AsyncIterator, Callable, Optional
|
||||
from typing import AsyncIterator, Callable
|
||||
|
||||
from bidict import bidict
|
||||
from pydantic import BaseModel
|
||||
|
@ -30,11 +30,11 @@ import trio
|
|||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
|
||||
from .. import data
|
||||
from ..log import get_logger
|
||||
from ..data._normalize import iterticks
|
||||
from ..data.feed import Feed
|
||||
from .._daemon import maybe_spawn_brokerd
|
||||
from .._cacheables import maybe_open_feed
|
||||
from . import _paper_engine as paper
|
||||
from ._messages import (
|
||||
Status, Order,
|
||||
|
@ -132,7 +132,6 @@ async def clear_dark_triggers(
|
|||
brokerd_orders_stream: tractor.MsgStream,
|
||||
ems_client_order_stream: tractor.MsgStream,
|
||||
quote_stream: tractor.ReceiveMsgStream, # noqa
|
||||
|
||||
broker: str,
|
||||
symbol: str,
|
||||
|
||||
|
@ -902,73 +901,6 @@ async def process_client_order_cmds(
|
|||
)
|
||||
|
||||
|
||||
class cache:
|
||||
'''Globally (processs wide) cached, task access to a
|
||||
kept-alive-while-in-use data feed.
|
||||
|
||||
'''
|
||||
lock = trio.Lock()
|
||||
users: int = 0
|
||||
feeds: dict[tuple[str, str], Feed] = {}
|
||||
no_more_users: Optional[trio.Event] = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_clearing_feed(
|
||||
|
||||
broker: str,
|
||||
symbol: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> Feed:
|
||||
try:
|
||||
log.info(f'Reusing existing feed for {(broker, symbol)}')
|
||||
yield cache.feeds[(broker, symbol)]
|
||||
except KeyError:
|
||||
# lock feed acquisition around task racing / ``trio``'s scheduler protocol
|
||||
await cache.lock.acquire()
|
||||
try:
|
||||
cache.users += 1
|
||||
cached_feed = cache.feeds[(broker, symbol)]
|
||||
cache.lock.release()
|
||||
try:
|
||||
yield cached_feed
|
||||
finally:
|
||||
cache.users -= 1
|
||||
if cache.users == 0:
|
||||
# signal to original allocator task feed use is complete
|
||||
cache.no_more_users.set()
|
||||
return
|
||||
|
||||
except KeyError:
|
||||
# **critical section** that should prevent other tasks from
|
||||
# checking the cache until complete otherwise the scheduler
|
||||
# may switch and by accident we create more then one feed.
|
||||
|
||||
cache.no_more_users = trio.Event()
|
||||
|
||||
log.warning(f'Creating new feed for {(broker, symbol)}')
|
||||
# TODO: eventually support N-brokers
|
||||
async with (
|
||||
data.open_feed(
|
||||
broker,
|
||||
[symbol],
|
||||
loglevel=loglevel,
|
||||
) as feed,
|
||||
):
|
||||
cache.feeds[(broker, symbol)] = feed
|
||||
cache.lock.release()
|
||||
try:
|
||||
yield feed
|
||||
finally:
|
||||
# don't tear down the feed until there are zero
|
||||
# users of it left.
|
||||
if cache.users > 0:
|
||||
await cache.no_more_users.wait()
|
||||
|
||||
cache.feeds.pop((broker, symbol))
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _emsd_main(
|
||||
|
||||
|
@ -1027,7 +959,7 @@ async def _emsd_main(
|
|||
|
||||
# spawn one task per broker feed
|
||||
async with (
|
||||
maybe_open_clearing_feed(
|
||||
maybe_open_feed(
|
||||
broker,
|
||||
symbol,
|
||||
loglevel=loglevel,
|
||||
|
@ -1073,11 +1005,9 @@ async def _emsd_main(
|
|||
n.start_soon(
|
||||
clear_dark_triggers,
|
||||
|
||||
# relay.brokerd_dialogue,
|
||||
brokerd_stream,
|
||||
ems_client_order_stream,
|
||||
feed.stream,
|
||||
|
||||
broker,
|
||||
symbol,
|
||||
book
|
||||
|
|
Loading…
Reference in New Issue