Draft a feed cacheing sub-system

cached_feeds
Tyler Goodlet 2021-03-25 10:26:02 -04:00
parent 0d4073dbd2
commit 65e7680cdd
1 changed files with 90 additions and 40 deletions

View File

@ -26,14 +26,22 @@ from contextlib import asynccontextmanager
from importlib import import_module
from types import ModuleType
from typing import (
Dict, Any, Sequence, AsyncIterator, Optional
Dict, Any, Sequence,
AsyncIterator, Optional,
Callable, Awaitable
)
import trio
import tractor
from pydantic import BaseModel
from ..brokers import get_brokermod
from ..log import get_logger, get_console_log
from .._daemon import spawn_brokerd, maybe_open_pikerd
from .._daemon import (
spawn_brokerd,
maybe_open_pikerd,
maybe_spawn_brokerd,
)
from ._normalize import iterticks
from ._sharedmem import (
maybe_open_shm_array,
@ -74,57 +82,97 @@ def get_ingestormod(name: str) -> ModuleType:
return module
@asynccontextmanager
async def maybe_spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
# @dataclass
class _FeedsCache(BaseModel):
"""Data feeds manager.
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = True,
) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely.
"""
if loglevel:
get_console_log(loglevel)
brokername: str
nursery: trio.Nursery
tasks: Dict[str, trio.CancelScope] = {}
dname = f'brokerd.{brokername}'
async with tractor.find_actor(dname) as portal:
class Config:
arbitrary_types_allowed = True
# WTF: why doesn't this work?
if portal is not None:
yield portal
# tasks: Dict[str, trio.CancelScope] = field(default_factory=dict)
else:
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel
) as pikerd_portal:
async def start_feed(
symbol: str,
func: Callable[[int], Awaitable[None]],
) -> None:
"""Start a bg feed task and register a surrouding cancel scope
for it.
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)
"""
with trio.CancelCscope() as cs:
pass
else:
await pikerd_portal.run(
spawn_brokerd,
async def cancel_all(self) -> None:
for name, cs in self.tasks.item():
log.debug(f'Cancelling cached feed for {name}')
cs.cancel()
_feeds: _FeedsCache = None
def get_feeds_manager(
brokername: str,
nursery: Optional[trio.Nursery] = None,
) -> _FeedsCache:
"""
Retreive data feeds manager from process global scope.
"""
global _feeds
if nursery is not None:
assert _feeds is None, "Feeds manager is already setup?"
# this is initial setup by parent actor
_feeds = _FeedsCache(
brokername=brokername,
loglevel=loglevel,
debug_mode=debug_mode,
nursery=nursery,
)
assert not _feeds.tasks
assert _feeds.brokername == brokername, "Uhhh wtf"
return _feeds
async def _setup_persistent_feeds(brokername: str) -> None:
"""Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
"""
async with trio.open_nursery() as service_nursery:
_feeds = get_feeds_manager(brokername, service_nursery)
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
@tractor.stream
async def allocate_cached_feed(
ctx: tractor.Context,
symbol: str
):
_feeds = get_feeds_manager(brokername, service_nursery)
# setup shared mem buffer
pass
async with tractor.wait_for_actor(dname) as portal:
yield portal
@dataclass
class Feed:
"""A data feed for client-side interaction with far-process
"""A data feed for client-side interaction with far-process# }}}
real-time data sources.
This is an thin abstraction on top of ``tractor``'s portals for
@ -279,6 +327,8 @@ async def open_feed(
if opened:
assert data['is_shm_writer']
log.info("Started shared mem bar writer")
else:
s = attach_shm_array(shm_token)
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity