Merge pull request from pikers/samplerd_service

`samplerd` service
epoch_index
goodboy 2023-01-30 11:48:07 -05:00 committed by GitHub
commit 61218f30f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1680 additions and 703 deletions

View File

@ -18,16 +18,27 @@
Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from __future__ import annotations
import os
from typing import (
Optional,
Callable,
Any,
ClassVar,
)
from contextlib import (
asynccontextmanager as acm,
)
from collections import defaultdict
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger, get_console_log
from .log import (
get_logger,
get_console_log,
)
from .brokers import get_brokermod
@ -42,28 +53,111 @@ _default_reg_addr: tuple[str, int] = (
_default_registry_port,
)
# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry_addr: tuple[str, int] | None = None
_registry: Registry | None = None
class Registry:
addr: None | tuple[str, int] = None
# TODO: table of uids to sockaddrs
peers: dict[
tuple[str, str],
tuple[str, int],
] = {}
_tractor_kwargs: dict[str, Any] = {}
@acm
async def open_registry(
addr: None | tuple[str, int] = None,
ensure_exists: bool = True,
) -> tuple[str, int]:
global _tractor_kwargs
actor = tractor.current_actor()
uid = actor.uid
if (
Registry.addr is not None
and addr
):
raise RuntimeError(
f'`{uid}` registry addr already bound @ {_registry.sockaddr}'
)
was_set: bool = False
if (
not tractor.is_root_process()
and Registry.addr is None
):
Registry.addr = actor._arb_addr
if (
ensure_exists
and Registry.addr is None
):
raise RuntimeError(
f"`{uid}` registry should already exist bug doesn't?"
)
if (
Registry.addr is None
):
was_set = True
Registry.addr = addr or _default_reg_addr
_tractor_kwargs['arbiter_addr'] = Registry.addr
try:
yield Registry.addr
finally:
# XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed
# in.
if was_set:
Registry.addr = None
def get_tractor_runtime_kwargs() -> dict[str, Any]:
'''
Deliver ``tractor`` related runtime variables in a `dict`.
'''
return _tractor_kwargs
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
}
_root_modules = [
__name__,
'piker.clearing._ems',
'piker.clearing._client',
'piker.data._sampling',
]
class Services(Struct):
# TODO: factor this into a ``tractor.highlevel`` extension
# pack for the library.
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
service_tasks: dict[
str,
tuple[
trio.CancelScope,
tractor.Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
@ -82,7 +176,12 @@ class Services(Struct):
'''
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
@ -94,156 +193,173 @@ class Services(Struct):
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res = await ctx.result()
except tractor.ContextCancelled:
return await self.cancel_service(name)
else:
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until
# cancelled either by error from the target
# context function or by being cancelled here by
# the surrounding cancel scope
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
self.service_tasks[name] = (cs, portal, complete)
return cs, first
# TODO: per service cancellation by scope, we aren't using this
# anywhere right?
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
# XXX: not entirely sure why this is required,
# and should probably be better fine tuned in
# ``tractor``?
cs, portal, complete = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None
@acm
async def open_pikerd(
start_method: str = 'trio',
loglevel: Optional[str] = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> Optional[tractor._portal.Portal]:
'''
Start a root piker daemon who's lifetime extends indefinitely
until cancelled.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
global _services
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
start_method: str = 'trio',
loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# TODO: once we have `rsyscall` support we will read a config
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_kwargs: dict = {},
) -> tuple[
tractor.Actor,
tuple[str, int],
]:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
Can be called from a subactor or any program that needs to start
a root actor.
'''
try:
# check for existing runtime
actor = tractor.current_actor().uid
except tractor._exceptions.NoRuntime:
registry_addr = registry_addr or _default_reg_addr
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
**tractor_kwargs,
) as _,
open_registry(registry_addr, ensure_exists=False) as addr,
):
yield (
tractor.current_actor(),
addr,
)
else:
async with open_registry(registry_addr) as addr:
yield (
actor,
addr,
)
@acm
async def open_pikerd(
loglevel: str | None = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> tractor.Actor:
) -> Services:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
Start a root piker daemon who's lifetime extends indefinitely until
cancelled.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
global _services
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
open_piker_runtime(
name=_root_dname,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules + enable_modules,
) as _,
enable_modules=_root_modules,
loglevel=loglevel,
debug_mode=debug_mode,
registry_addr=registry_addr,
) as (root_actor, reg_addr),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
yield tractor.current_actor()
assert root_actor.accept_addr == reg_addr
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
@acm
@ -252,51 +368,67 @@ async def maybe_open_runtime(
**kwargs,
) -> None:
"""
'''
Start the ``tractor`` runtime (a root actor) if none exists.
"""
settings = _tractor_kwargs
settings.update(kwargs)
'''
name = kwargs.pop('name')
if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(
async with open_piker_runtime(
name,
loglevel=loglevel,
**settings,
):
yield
**kwargs,
) as (_, addr):
yield addr,
else:
yield
async with open_registry() as addr:
yield addr
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and
) -> tractor._portal.Portal | ClassVar[Services]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
though).
"""
'''
if loglevel:
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
query_name = kwargs.pop('name', f'piker_query_{os.getpid()}')
# TODO: if we need to make the query part faster we could not init
# an actor runtime and instead just hit the socket?
# from tractor._ipc import _connect_chan, Channel
# async with _connect_chan(host, port) as chan:
# async with open_portal(chan) as arb_portal:
# yield arb_portal
async with (
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal
open_piker_runtime(
name=query_name,
registry_addr=registry_addr,
loglevel=loglevel,
**kwargs,
) as _,
tractor.find_actor(
_root_dname,
arbiter_sockaddr=registry_addr,
) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
@ -304,19 +436,21 @@ async def maybe_open_pikerd(
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _:
) as service_manager:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
assert service_manager
yield service_manager
# brokerd enabled modules
# `brokerd` enabled modules
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
@ -326,37 +460,35 @@ _data_mods = [
]
class Brokerd:
locks = defaultdict(trio.Lock)
@acm
async def find_service(
service_name: str,
) -> Optional[tractor.Portal]:
) -> tractor.Portal | None:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as maybe_portal:
yield maybe_portal
async with open_registry() as reg_addr:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as maybe_portal:
yield maybe_portal
async def check_for_service(
service_name: str,
) -> bool:
) -> None | tuple[str, int]:
'''
Service daemon "liveness" predicate.
'''
async with tractor.query_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as sockaddr:
return sockaddr
async with open_registry(ensure_exists=False) as reg_addr:
async with tractor.query_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as sockaddr:
return sockaddr
@acm
@ -366,6 +498,8 @@ async def maybe_spawn_daemon(
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
singleton: bool = False,
**kwargs,
) -> tractor.Portal:
@ -386,7 +520,7 @@ async def maybe_spawn_daemon(
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
lock = Services.locks[service_name]
await lock.acquire()
async with find_service(service_name) as portal:
@ -397,6 +531,9 @@ async def maybe_spawn_daemon(
log.warning(f"Couldn't find any existing {service_name}")
# TODO: really shouldn't the actor spawning be part of the service
# starting method `Services.start_service()` ?
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
@ -407,15 +544,16 @@ async def maybe_spawn_daemon(
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
started = await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
@ -424,11 +562,14 @@ async def maybe_spawn_daemon(
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
started = await pikerd_portal.run(
service_task_target,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
@ -451,9 +592,6 @@ async def spawn_brokerd(
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
@ -466,18 +604,18 @@ async def spawn_brokerd(
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await _services.actor_n.start_actor(
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=_services.debug_mode,
debug_mode=Services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.start_service_task(
await Services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
@ -523,24 +661,21 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
global _services
assert _services
portal = await _services.actor_n.start_actor(
portal = await Services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=_services.debug_mode, # set by pikerd flag
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.start_service_task(
await Services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
@ -567,25 +702,3 @@ async def maybe_open_emsd(
) as portal:
yield portal
# TODO: ideally we can start the tsdb "on demand" but it's
# probably going to require "rootless" docker, at least if we don't
# want to expect the user to start ``pikerd`` with root perms all the
# time.
# async def maybe_open_marketstored(
# loglevel: Optional[str] = None,
# **kwargs,
# ) -> tractor._portal.Portal: # noqa
# async with maybe_spawn_daemon(
# 'marketstored',
# service_task_target=spawn_emsd,
# spawn_args={'loglevel': loglevel},
# loglevel=loglevel,
# **kwargs,
# ) as portal:
# yield portal

View File

@ -18,3 +18,9 @@
Market machinery for order executions, book, management.
"""
from ._client import open_ems
__all__ = [
'open_ems',
]

View File

@ -18,8 +18,10 @@
Orders and execution client API.
"""
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import TYPE_CHECKING
import trio
import tractor
@ -27,11 +29,16 @@ from tractor.trionics import broadcast_receiver
from ..log import get_logger
from ..data.types import Struct
from ._ems import _emsd_main
from .._daemon import maybe_open_emsd
from ._messages import Order, Cancel
from ..brokers import get_brokermod
if TYPE_CHECKING:
from ._messages import (
BrokerdPosition,
Status,
)
log = get_logger(__name__)
@ -167,12 +174,19 @@ async def relay_order_cmds_from_sync_code(
@acm
async def open_ems(
fqsn: str,
mode: str = 'live',
) -> (
) -> tuple[
OrderBook,
tractor.MsgStream,
dict,
):
dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
],
list[str],
dict[str, Status],
]:
'''
Spawn an EMS daemon and begin sending orders and receiving
alerts.
@ -213,14 +227,16 @@ async def open_ems(
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn)
mode: str = 'live'
async with maybe_open_emsd(broker) as portal:
mod = get_brokermod(broker)
if not getattr(mod, 'trades_dialogue', None):
if (
not getattr(mod, 'trades_dialogue', None)
or mode == 'paper'
):
mode = 'paper'
from ._ems import _emsd_main
async with (
# connect to emsd
portal.open_context(

View File

@ -172,6 +172,7 @@ async def clear_dark_triggers(
# TODO:
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
async for quotes in quote_stream:
# start = time.time()
for sym, quote in quotes.items():
@ -417,7 +418,7 @@ class Router(Struct):
# load the paper trading engine
exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
log.info(f'{broker}: Entering `paper` trading mode')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
@ -866,7 +867,7 @@ async def translate_and_relay_brokerd_events(
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
status_msg = book._active.pop(oid)
status_msg = book._active.pop(oid, None)
else: # open
# relayed from backend but probably not handled so
@ -1366,7 +1367,15 @@ async def _emsd_main(
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> None:
) -> tuple[
dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
],
list[str],
dict[str, Status],
]:
'''
EMS (sub)actor entrypoint providing the execution management
(micro)service which conducts broker order clearing control on

View File

@ -28,7 +28,6 @@ import tractor
from ..log import get_console_log, get_logger, colorize_json
from ..brokers import get_brokermod
from .._daemon import (
_tractor_kwargs,
_default_registry_host,
_default_registry_port,
)
@ -176,20 +175,30 @@ def cli(
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.argument('names', nargs=-1, required=False)
@click.argument('ports', nargs=-1, required=False)
@click.pass_obj
def services(config, tl, names):
def services(config, tl, ports):
from .._daemon import open_piker_runtime
from .._daemon import (
open_piker_runtime,
_default_registry_port,
_default_registry_host,
)
host = _default_registry_host
if not ports:
ports = [_default_registry_port]
async def list_services():
nonlocal host
async with (
open_piker_runtime(
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr']
host=host,
port=ports[0]
) as portal
):
registry = await portal.run_from_ns('self', 'get_registry')

View File

@ -22,6 +22,12 @@ and storing data from your brokers as well as
sharing live streams over a network.
"""
import tractor
import trio
from ..log import (
get_console_log,
)
from ._normalize import iterticks
from ._sharedmem import (
maybe_open_shm_array,
@ -32,7 +38,6 @@ from ._sharedmem import (
)
from .feed import (
open_feed,
_setup_persistent_brokerd,
)
@ -44,5 +49,40 @@ __all__ = [
'attach_shm_array',
'open_shm_array',
'get_shm_token',
'_setup_persistent_brokerd',
]
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
get_console_log(tractor.current_actor().loglevel)
from .feed import (
_bus,
get_feed_bus,
)
global _bus
assert not _bus
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()

View File

@ -20,174 +20,255 @@ financial data flows.
"""
from __future__ import annotations
from collections import Counter
from collections import (
Counter,
defaultdict,
)
from contextlib import asynccontextmanager as acm
import time
from typing import (
AsyncIterator,
TYPE_CHECKING,
)
import tractor
from tractor.trionics import (
maybe_open_nursery,
)
import trio
from trio_typing import TaskStatus
from ..log import get_logger
from ..log import (
get_logger,
get_console_log,
)
from .._daemon import maybe_spawn_daemon
if TYPE_CHECKING:
from ._sharedmem import ShmArray
from ._sharedmem import (
ShmArray,
)
from .feed import _FeedsBus
log = get_logger(__name__)
# highest frequency sample step is 1 second by default, though in
# the future we may want to support shorter periods or a dynamic style
# tick-event stream.
_default_delay_s: float = 1.0
class sampler:
class Sampler:
'''
Global sampling engine registry.
Manages state for sampling events, shm incrementing and
sample period logic.
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step sample real-time quote feeds, see
``._daemon.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
'''
service_nursery: None | trio.Nursery = None
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[int, list[ShmArray]] = {}
ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from
# history loading.
incrementers: dict[int, trio.CancelScope] = {}
incr_task_cs: trio.CancelScope | None = None
# holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are
# notified on a step.
subscribers: dict[int, tractor.Context] = {}
# subscribers: dict[int, list[tractor.MsgStream]] = {}
subscribers: defaultdict[
float,
list[
float,
set[tractor.MsgStream]
],
] = defaultdict(
lambda: [
round(time.time()),
set(),
]
)
@classmethod
async def increment_ohlc_buffer(
self,
period_s: float,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
'''
Task which inserts new bars into the provide shared memory array
every ``period_s`` seconds.
async def increment_ohlc_buffer(
delay_s: int,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
'''
Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds.
This task fulfills 2 purposes:
- it takes the subscribed set of shm arrays and increments them
on a common time period
- broadcast of this increment "signal" message to other actor
subscribers
This task fulfills 2 purposes:
- it takes the subscribed set of shm arrays and increments them
on a common time period
- broadcast of this increment "signal" message to other actor
subscribers
Note that if **no** actor has initiated this task then **none** of
the underlying buffers will actually be incremented.
Note that if **no** actor has initiated this task then **none** of
the underlying buffers will actually be incremented.
'''
# TODO: right now we'll spin printing bars if the last time stamp is
# before a large period of no market activity. Likely the best way
# to solve this is to make this task aware of the instrument's
# tradable hours?
'''
# # wait for brokerd to signal we should start sampling
# await shm_incrementing(shm_token['shm_name']).wait()
total_s: float = 0 # total seconds counted
ad = period_s - 0.001 # compensate for trio processing time
# TODO: right now we'll spin printing bars if the last time stamp is
# before a large period of no market activity. Likely the best way
# to solve this is to make this task aware of the instrument's
# tradable hours?
with trio.CancelScope() as cs:
# register this time period step as active
task_status.started(cs)
# adjust delay to compensate for trio processing time
ad = min(sampler.ohlcv_shms.keys()) - 0.001
# sample step loop:
# includes broadcasting to all connected consumers on every
# new sample step as well incrementing any registered
# buffers by registered sample period.
while True:
await trio.sleep(ad)
total_s += period_s
total_s = 0 # total seconds counted
lowest = min(sampler.ohlcv_shms.keys())
lowest_shm = sampler.ohlcv_shms[lowest][0]
ad = lowest - 0.001
# increment all subscribed shm arrays
# TODO:
# - this in ``numba``
# - just lookup shms for this step instead of iterating?
with trio.CancelScope() as cs:
i_epoch = round(time.time())
broadcasted: set[float] = set()
# register this time period step as active
sampler.incrementers[delay_s] = cs
task_status.started(cs)
# print(f'epoch: {i_epoch} -> REGISTRY {self.ohlcv_shms}')
for shm_period_s, shms in self.ohlcv_shms.items():
while True:
# TODO: do we want to support dynamically
# adding a "lower" lowest increment period?
await trio.sleep(ad)
total_s += delay_s
# short-circuit on any not-ready because slower sample
# rate consuming shm buffers.
if total_s % shm_period_s != 0:
# print(f'skipping `{shm_period_s}s` sample update')
continue
# increment all subscribed shm arrays
# TODO:
# - this in ``numba``
# - just lookup shms for this step instead of iterating?
for this_delay_s, shms in sampler.ohlcv_shms.items():
# update last epoch stamp for this period group
if shm_period_s not in broadcasted:
sub_pair = self.subscribers[shm_period_s]
sub_pair[0] = i_epoch
broadcasted.add(shm_period_s)
# short-circuit on any not-ready because slower sample
# rate consuming shm buffers.
if total_s % this_delay_s != 0:
# print(f'skipping `{this_delay_s}s` sample update')
continue
# TODO: ``numba`` this!
for shm in shms:
# print(f'UPDATE {shm_period_s}s STEP for {shm.token}')
# TODO: ``numba`` this!
for shm in shms:
# TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of
# using ``.push()``?
# append new entry to buffer thus "incrementing"
# the bar
array = shm.array
last = array[-1:][shm._write_fields].copy()
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# guard against startup backfilling races where
# the buffer has not yet been filled.
if not last.size:
continue
# this copies non-std fields (eg. vwap) from the last datum
last[
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + this_delay_s, 0, close, close, close, close)
(t, close) = last[0][[
'time',
'close',
]]
# write to the buffer
shm.push(last)
next_t = t + shm_period_s
await broadcast(delay_s, shm=lowest_shm)
if shm_period_s <= 1:
next_t = i_epoch
# this copies non-std fields (eg. vwap) from the
# last datum
last[[
'time',
async def broadcast(
delay_s: int,
shm: ShmArray | None = None,
'open',
'high',
'low',
'close',
) -> None:
'''
Broadcast the given ``shm: ShmArray``'s buffer index step to any
subscribers for a given sample period.
'volume',
]][0] = (
# epoch timestamp
next_t,
The sent msg will include the first and last index which slice into
the buffer's non-empty data.
# OHLC
close,
close,
close,
close,
'''
subs = sampler.subscribers.get(delay_s, ())
first = last = -1
0, # vlm
)
if shm is None:
periods = sampler.ohlcv_shms.keys()
# if this is an update triggered by a history update there
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
shm = sampler.ohlcv_shms[lowest][0]
first = shm._first.value
last = shm._last.value
# TODO: in theory we could make this faster by
# copying the "last" readable value into the
# underlying larger buffer's next value and then
# incrementing the counter instead of using
# ``.push()``?
for stream in subs:
try:
await stream.send({
'first': first,
'last': last,
'index': last,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
# write to the buffer
shm.push(last)
# broadcast increment msg to all updated subs per period
for shm_period_s in broadcasted:
await self.broadcast(
period_s=shm_period_s,
time_stamp=i_epoch,
)
@classmethod
async def broadcast(
self,
period_s: float,
time_stamp: float | None = None,
) -> None:
'''
Broadcast the period size and last index step value to all
subscribers for a given sample period.
'''
pair = self.subscribers[period_s]
last_ts, subs = pair
task = trio.lowlevel.current_task()
log.debug(
f'SUBS {self.subscribers}\n'
f'PAIR {pair}\n'
f'TASK: {task}: {id(task)}\n'
f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
borked: set[tractor.MsgStream] = set()
for stream in subs:
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
for stream in borked:
try:
subs.remove(stream)
except ValueError:
@ -195,35 +276,227 @@ async def broadcast(
f'{stream._ctx.chan.uid} sub already removed!?'
)
@classmethod
async def broadcast_all(self) -> None:
for period_s in self.subscribers:
await self.broadcast(period_s)
@tractor.context
async def iter_ohlc_periods(
async def register_with_sampler(
ctx: tractor.Context,
delay_s: int,
period_s: float,
shms_by_period: dict[float, dict] | None = None,
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> None:
'''
Subscribe to OHLC sampling "step" events: when the time
aggregation period increments, this event stream emits an index
event.
'''
# add our subscription
subs = sampler.subscribers.setdefault(delay_s, [])
await ctx.started()
async with ctx.open_stream() as stream:
subs.append(stream)
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
try:
# stream and block until cancelled
await trio.sleep_forever()
finally:
try:
subs.remove(stream)
except ValueError:
log.error(
f'iOHLC step stream was already dropped {ctx.chan.uid}?'
try:
async with maybe_open_nursery(
Sampler.service_nursery
) as service_nursery:
# init startup, create (actor-)local service nursery and start
# increment task
Sampler.service_nursery = service_nursery
# always ensure a period subs entry exists
last_ts, subs = Sampler.subscribers[float(period_s)]
async with trio.Lock():
if Sampler.incr_task_cs is None:
Sampler.incr_task_cs = await service_nursery.start(
Sampler.increment_ohlc_buffer,
1.,
)
incr_was_started = True
# insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second.
if shms_by_period is not None:
from ._sharedmem import (
attach_shm_array,
_Token,
)
for period in shms_by_period:
# load and register shm handles
shm_token_msg = shms_by_period[period]
shm = attach_shm_array(
_Token.from_msg(shm_token_msg),
readonly=False,
)
shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
if open_index_stream:
try:
async with ctx.open_stream() as stream:
if sub_for_broadcasts:
subs.add(stream)
# except broadcast requests from the subscriber
async for msg in stream:
if msg == 'broadcast_all':
await Sampler.broadcast_all()
finally:
if sub_for_broadcasts:
subs.remove(stream)
else:
# if no shms are passed in we just wait until cancelled
# by caller.
await trio.sleep_forever()
finally:
# TODO: why tf isn't this working?
if shms_by_period is not None:
for period, shm in shms_by_period.items():
Sampler.ohlcv_shms[period].remove(shm)
if incr_was_started:
Sampler.incr_task_cs.cancel()
Sampler.incr_task_cs = None
async def spawn_samplerd(
loglevel: str | None = None,
**extra_tractor_kwargs
) -> bool:
'''
Daemon-side service task: start a sampling daemon for common step
update and increment count write and stream broadcasting.
'''
from piker._daemon import Services
dname = 'samplerd'
log.info(f'Spawning `{dname}`')
# singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']:
if dname not in Services.service_tasks:
portal = await Services.actor_n.start_actor(
dname,
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
)
return True
return False
@acm
async def maybe_open_samplerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal: # noqa
'''
Client-side helper to maybe startup the ``samplerd`` service
under the ``pikerd`` tree.
'''
dname = 'samplerd'
async with maybe_spawn_daemon(
dname,
service_task_target=spawn_samplerd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal
@acm
async def open_sample_stream(
period_s: float,
shms_by_period: dict[float, dict] | None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
cache_key: str | None = None,
allow_new_sampler: bool = True,
) -> AsyncIterator[dict[str, float]]:
'''
Subscribe to OHLC sampling "step" events: when the time aggregation
period increments, this event stream emits an index event.
This is a client-side endpoint that does all the work of ensuring
the `samplerd` actor is up and that mult-consumer-tasks are given
a broadcast stream when possible.
'''
# TODO: wrap this manager with the following to make it cached
# per client-multitasks entry.
# maybe_open_context(
# acm_func=partial(
# portal.open_context,
# register_with_sampler,
# ),
# key=cache_key or period_s,
# )
# if cache_hit:
# # add a new broadcast subscription for the quote stream
# # if this feed is likely already in use
# async with istream.subscribe() as bistream:
# yield bistream
# else:
async with (
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
maybe_open_samplerd() as portal,
portal.open_context(
register_with_sampler,
**{
'period_s': period_s,
'shms_by_period': shms_by_period,
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
},
) as (ctx, first)
):
async with (
ctx.open_stream() as istream,
# TODO: we don't need this task-bcasting right?
# istream.subscribe() as istream,
):
yield istream
async def sample_and_broadcast(
@ -236,7 +509,14 @@ async def sample_and_broadcast(
sum_tick_vlm: bool = True,
) -> None:
'''
`brokerd`-side task which writes latest datum sampled data.
This task is meant to run in the same actor (mem space) as the
`brokerd` real-time quote feed which is being sampled to
a ``ShmArray`` buffer.
'''
log.info("Started shared mem bar writer")
overruns = Counter()
@ -273,7 +553,6 @@ async def sample_and_broadcast(
for shm in [rt_shm, hist_shm]:
# update last entry
# benchmarked in the 4-5 us range
# for shm in [rt_shm, hist_shm]:
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
@ -383,6 +662,7 @@ async def sample_and_broadcast(
trio.ClosedResourceError,
trio.EndOfChannel,
):
ctx = stream._ctx
chan = ctx.chan
if ctx:
log.warning(
@ -404,10 +684,63 @@ async def sample_and_broadcast(
)
# a working tick-type-classes template
_tick_groups = {
'clears': {'trade', 'dark_trade', 'last'},
'bids': {'bid', 'bsize'},
'asks': {'ask', 'asize'},
}
def frame_ticks(
first_quote: dict,
last_quote: dict,
ticks_by_type: dict,
) -> None:
# append quotes since last iteration into the last quote's
# tick array/buffer.
ticks = last_quote.get('ticks')
# TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
# TODO: do we need this any more or can we just
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
first_quote['ticks'].extend(ticks)
# XXX: build a tick-by-type table of lists
# of tick messages. This allows for less
# iteration on the receiver side by allowing for
# a single "latest tick event" look up by
# indexing the last entry in each sub-list.
# tbt = {
# 'types': ['bid', 'asize', 'last', .. '<type_n>'],
# 'bid': [tick0, tick1, tick2, .., tickn],
# 'asize': [tick0, tick1, tick2, .., tickn],
# 'last': [tick0, tick1, tick2, .., tickn],
# ...
# '<type_n>': [tick0, tick1, tick2, .., tickn],
# }
# append in reverse FIFO order for in-order iteration on
# receiver side.
for tick in ticks:
ttype = tick['type']
ticks_by_type[ttype].append(tick)
# TODO: a less naive throttler, here's some snippets:
# token bucket by njs:
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
async def uniform_rate_send(
rate: float,
@ -418,6 +751,9 @@ async def uniform_rate_send(
) -> None:
# try not to error-out on overruns of the subscribed (chart) client
stream._ctx._backpressure = True
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
@ -427,6 +763,12 @@ async def uniform_rate_send(
diff = 0
task_status.started()
ticks_by_type: defaultdict[
str,
list[dict],
] = defaultdict(list)
clear_types = _tick_groups['clears']
while True:
@ -445,34 +787,17 @@ async def uniform_rate_send(
if not first_quote:
first_quote = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
# received a quote but the send cycle period hasn't yet
# expired we aren't supposed to send yet so append
# to the tick frame.
# append quotes since last iteration into the last quote's
# tick array/buffer.
ticks = last_quote.get('ticks')
# XXX: idea for frame type data structure we could
# use on the wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
# TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
first_quote['ticks'].extend(ticks)
frame_ticks(
first_quote,
last_quote,
ticks_by_type,
)
# send cycle isn't due yet so continue waiting
continue
@ -489,12 +814,35 @@ async def uniform_rate_send(
# received quote ASAP.
sym, first_quote = await quote_stream.receive()
frame_ticks(
first_quote,
first_quote,
ticks_by_type,
)
# we have a quote already so send it now.
with trio.move_on_after(throttle_period) as cs:
while (
not set(ticks_by_type).intersection(clear_types)
):
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
break
frame_ticks(
first_quote,
last_quote,
ticks_by_type,
)
# measured_rate = 1 / (time.time() - last_send)
# log.info(
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
# )
first_quote['tbt'] = ticks_by_type
# TODO: now if only we could sync this to the display
# rate timing exactly lul
@ -520,3 +868,4 @@ async def uniform_rate_send(
first_quote = last_quote = None
diff = 0
last_send = time.time()
ticks_by_type.clear()

View File

@ -92,7 +92,7 @@ class NoBsWs:
while True:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
except self.recon_errors:
await trio.sleep(0.5)
else:
break

View File

@ -21,14 +21,17 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from collections import defaultdict
from collections import (
defaultdict,
Counter,
)
from contextlib import asynccontextmanager as acm
from datetime import datetime
from functools import partial
import time
from types import ModuleType
from typing import (
Any,
AsyncIterator,
AsyncContextManager,
Callable,
Optional,
@ -51,16 +54,18 @@ import numpy as np
from ..brokers import get_brokermod
from ..calc import humanize
from ..log import get_logger, get_console_log
from ..log import (
get_logger,
get_console_log,
)
from .._daemon import (
maybe_spawn_brokerd,
check_for_service,
)
from .flows import Flume
from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
_Token,
_secs_in_day,
)
from .ingest import get_ingestormod
@ -72,13 +77,9 @@ from ._source import (
)
from ..ui import _search
from ._sampling import (
sampler,
broadcast,
increment_ohlc_buffer,
iter_ohlc_periods,
open_sample_stream,
sample_and_broadcast,
uniform_rate_send,
_default_delay_s,
)
from ..brokers._util import (
DataUnavailable,
@ -128,7 +129,7 @@ class _FeedsBus(Struct):
target: Awaitable,
*args,
) -> None:
) -> trio.CancelScope:
async def start_with_cs(
task_status: TaskStatus[
@ -226,36 +227,6 @@ def get_feed_bus(
return _bus
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
get_console_log(tractor.current_actor().loglevel)
global _bus
assert not _bus
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
def diff_history(
array: np.ndarray,
timeframe: int,
@ -278,6 +249,7 @@ async def start_backfill(
bfqsn: str,
shm: ShmArray,
timeframe: float,
sampler_stream: tractor.MsgStream,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
@ -309,6 +281,25 @@ async def start_backfill(
- pendulum.from_timestamp(times[-2])
).seconds
if step_size_s == 60:
inow = round(time.time())
diff = inow - times[-1]
if abs(diff) > 60:
surr = array[-6:]
diff_in_mins = round(diff/60., ndigits=2)
log.warning(
f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n'
f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
'Surrounding 6 time stamps:\n'
f'{list(surr["time"])}\n'
'Here is surrounding 6 samples:\n'
f'{surr}\nn'
)
# uncomment this for a hacker who wants to investigate
# this case manually..
# await tractor.breakpoint()
# frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * step_size_s
@ -326,8 +317,7 @@ async def start_backfill(
# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart..
for delay_s in sampler.subscribers:
await broadcast(delay_s)
await sampler_stream.send('broadcast_all')
# signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event()
@ -376,8 +366,9 @@ async def start_backfill(
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts.
starts: set[datetime] = set()
# starts and associated counts of how many duplicates we see
# per time stamp.
starts: Counter[datetime] = Counter()
# inline sequential loop where we simply pass the
# last retrieved start dt to the next request as
@ -405,14 +396,26 @@ async def start_backfill(
# request loop until the condition is resolved?
return
if next_start_dt in starts:
if (
next_start_dt in starts
and starts[next_start_dt] <= 6
):
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
log.warning(
f"{bfqsn}: skipping duplicate frame @ {next_start_dt}"
)
starts[start_dt] += 1
continue
elif starts[next_start_dt] > 6:
log.warning(
f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
)
return
# only update new start point if not-yet-seen
start_dt = next_start_dt
starts.add(start_dt)
starts[start_dt] += 1
assert array['time'][0] == start_dt.timestamp()
@ -484,8 +487,7 @@ async def start_backfill(
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
for delay_s in sampler.subscribers:
await broadcast(delay_s)
await sampler_stream.send('broadcast_all')
# short-circuit (for now)
bf_done.set()
@ -496,6 +498,7 @@ async def basic_backfill(
mod: ModuleType,
bfqsn: str,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
) -> None:
@ -513,7 +516,8 @@ async def basic_backfill(
mod,
bfqsn,
shm,
timeframe=timeframe,
timeframe,
sampler_stream,
)
)
except DataUnavailable:
@ -529,6 +533,7 @@ async def tsdb_backfill(
fqsn: str,
bfqsn: str,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -561,7 +566,8 @@ async def tsdb_backfill(
mod,
bfqsn,
shm,
timeframe=timeframe,
timeframe,
sampler_stream,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
@ -599,10 +605,7 @@ async def tsdb_backfill(
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started((
shms[60],
shms[1],
))
task_status.started()
async def back_load_from_tsdb(
timeframe: int,
@ -658,10 +661,10 @@ async def tsdb_backfill(
# Load TSDB history into shm buffer (for display) if there is
# remaining buffer space.
if (
len(tsdb_history)
):
# load the first (smaller) bit of history originally loaded
# above from ``Storage.load()``.
to_push = tsdb_history[-prepend_start:]
@ -678,26 +681,27 @@ async def tsdb_backfill(
tsdb_last_frame_start = tsdb_history['Epoch'][0]
if timeframe == 1:
times = shm.array['time']
assert (times[1] - times[0]) == 1
# load as much from storage into shm possible (depends on
# user's shm size settings).
while (
shm._first.value > 0
):
while shm._first.value > 0:
tsdb_history = await storage.read_ohlcv(
fqsn,
end=tsdb_last_frame_start,
timeframe=timeframe,
end=tsdb_last_frame_start,
)
# empty query
if not len(tsdb_history):
break
next_start = tsdb_history['Epoch'][0]
if (
not len(tsdb_history) # empty query
if next_start >= tsdb_last_frame_start:
# no earlier data detected
or next_start >= tsdb_last_frame_start
):
break
else:
tsdb_last_frame_start = next_start
@ -725,8 +729,7 @@ async def tsdb_backfill(
# (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full
# graphics loop cycle.
for delay_s in sampler.subscribers:
await broadcast(delay_s)
await sampler_stream.send('broadcast_all')
# TODO: write new data to tsdb to be ready to for next read.
@ -770,11 +773,14 @@ async def manage_history(
# from tractor._state import _runtime_vars
# port = _runtime_vars['_root_mailbox'][1]
uid = tractor.current_actor().uid
suffix = '.'.join(uid)
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_hist_p{port}',
key=f'{fqsn}_hist',
key=f'{fqsn}_hist.{suffix}',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -792,7 +798,7 @@ async def manage_history(
rt_shm, opened = maybe_open_shm_array(
# key=f'{fqsn}_rt_p{port}',
key=f'{fqsn}_rt',
key=f'{fqsn}_rt.{suffix}',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -815,224 +821,99 @@ async def manage_history(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored')
# register 1s and 1m buffers with the global incrementer task
async with open_sample_stream(
period_s=1.,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
assert open_history_client
# NOTE: we want to only open a stream for doing broadcasts on
# backfill operations, not receive the sample index-stream
# (since there's no code in this data feed layer that needs to
# consume it).
open_index_stream=True,
sub_for_broadcasts=False,
if (
tsdb_is_up
and opened
and open_history_client
):
log.info('Found existing `marketstored`')
) as sample_stream:
from . import marketstore
async with (
marketstore.open_storage_client(fqsn)as storage,
log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored')
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
assert open_history_client
if (
tsdb_is_up
and opened
and open_history_client
):
hist_shm, rt_shm = await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,
log.info('Found existing `marketstored`')
from . import marketstore
async with (
marketstore.open_storage_client(fqsn)as storage,
):
# TODO: drop returning the output that we pass in?
await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,
bus,
storage,
fqsn,
bfqsn,
{
1: rt_shm,
60: hist_shm,
},
sample_stream,
)
# yield back after client connect with filled shm
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction
# and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing.
await trio.sleep_forever()
# load less history if no tsdb can be found
elif (
not tsdb_is_up
and opened
):
await basic_backfill(
bus,
storage,
fqsn,
mod,
bfqsn,
{
1: rt_shm,
60: hist_shm,
},
sample_stream,
)
# yield back after client connect with filled shm
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever()
# load less history if no tsdb can be found
elif (
not tsdb_is_up
and opened
):
await basic_backfill(
bus,
mod,
bfqsn,
shms={
1: rt_shm,
60: hist_shm,
},
)
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
some_data_ready.set()
await trio.sleep_forever()
class Flume(Struct):
'''
Composite reference type which points to all the addressing handles
and other meta-data necessary for the read, measure and management
of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that can
be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport
- history and real-time shm buffers which are both real-time
updated and backfilled.
- associated startup indexing information related to both buffer
real-time-append and historical prepend addresses.
- low level APIs to read and measure the updated data and manage
queuing properties.
'''
symbol: Symbol
first_quote: dict
_hist_shm_token: _Token
_rt_shm_token: _Token
# private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None
stream: tractor.MsgStream | None = None
izero_hist: int = 0
izero_rt: int = 0
throttle_rate: int | None = None
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
@property
def rt_shm(self) -> ShmArray:
if self._rt_shm is None:
self._rt_shm = attach_shm_array(
token=self._rt_shm_token,
readonly=True,
)
return self._rt_shm
@property
def hist_shm(self) -> ShmArray:
if self._hist_shm is None:
self._hist_shm = attach_shm_array(
token=self._hist_shm_token,
readonly=True,
)
return self._hist_shm
async def receive(self) -> dict:
return await self.stream.receive()
@acm
async def index_stream(
self,
delay_s: int = 1,
) -> AsyncIterator[int]:
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
assert portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
def get_ds_info(
self,
) -> tuple[float, float, float]:
'''
Compute the "downsampling" ratio info between the historical shm
buffer and the real-time (HFT) one.
Return a tuple of the fast sample period, historical sample
period and ratio between them.
'''
times = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds
times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
rt_step_size_s = (end - start).seconds
ratio = hist_step_size_s / rt_step_size_s
return (
rt_step_size_s,
hist_step_size_s,
ratio,
)
# TODO: get native msgspec decoding for these workinn
def to_msg(self) -> dict:
msg = self.to_dict()
msg['symbol'] = msg['symbol'].to_dict()
# can't serialize the stream or feed objects, it's expected
# you'll have a ref to it since this msg should be rxed on
# a stream on whatever far end IPC..
msg.pop('stream')
msg.pop('feed')
return msg
@classmethod
def from_msg(cls, msg: dict) -> dict:
symbol = Symbol(**msg.pop('symbol'))
return cls(
symbol=symbol,
**msg,
)
async def allocate_persistent_feed(
bus: _FeedsBus,
@ -1074,6 +955,8 @@ async def allocate_persistent_feed(
some_data_ready = trio.Event()
feed_is_live = trio.Event()
symstr = symstr.lower()
# establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quote = await bus.nursery.start(
@ -1132,6 +1015,7 @@ async def allocate_persistent_feed(
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
(
izero_hist,
hist_shm,
@ -1165,13 +1049,6 @@ async def allocate_persistent_feed(
# feed to that name (for now).
bus.feeds[symstr] = bus.feeds[bfqsn] = flume
# insert 1s ohlc into the increment buffer set
# to update and shift every second
sampler.ohlcv_shms.setdefault(
1,
[]
).append(rt_shm)
task_status.started()
if not start_stream:
@ -1181,18 +1058,6 @@ async def allocate_persistent_feed(
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# insert 1m ohlc into the increment buffer set
# to shift every 60s.
sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
# create buffer a single incrementer task broker backend
# (aka `brokerd`) using the lowest sampler period.
if sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
_default_delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
@ -1205,7 +1070,12 @@ async def allocate_persistent_feed(
rt_shm.push(hist_shm.array[-3:-1])
ohlckeys = ['open', 'high', 'low', 'close']
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
rt_shm.array['volume'][-2] = 0
rt_shm.array['volume'][-2:] = 0
# set fast buffer time step to 1s
ts = round(time.time())
rt_shm.array['time'][0] = ts
rt_shm.array['time'][1] = ts + 1
# wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop.
@ -1248,6 +1118,10 @@ async def open_feed_bus(
symbol.
'''
# ensure that a quote feed stream which is pushing too fast doesn't
# cause and overrun in the client.
ctx._backpressure = True
if loglevel is None:
loglevel = tractor.current_actor().loglevel
@ -1261,10 +1135,6 @@ async def open_feed_bus(
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
# XXX: figure this not crashing into debug!
# await tractor.breakpoint()
# assert 0
assert brokername in servicename
bus = get_feed_bus(brokername)
@ -1273,6 +1143,10 @@ async def open_feed_bus(
flumes: dict[str, Flume] = {}
for symbol in symbols:
# we always use lower case keys internally
symbol = symbol.lower()
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
@ -1359,6 +1233,7 @@ async def open_feed_bus(
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
ctx._backpressure = False
cs = await bus.start_task(
uniform_rate_send,
tick_throttle,

321
piker/data/flows.py 100644
View File

@ -0,0 +1,321 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
abstractions for organizing, managing and generally operating-on
real-time data processing data-structures.
"Streams, flumes, cascades and flows.."
"""
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from functools import partial
from typing import (
AsyncIterator,
TYPE_CHECKING,
)
import tractor
from tractor.trionics import (
maybe_open_context,
)
import pendulum
import numpy as np
from .types import Struct
from ._source import (
Symbol,
)
from ._sharedmem import (
attach_shm_array,
ShmArray,
_Token,
)
from ._sampling import (
open_sample_stream,
)
if TYPE_CHECKING:
from pyqtgraph import PlotItem
from .feed import Feed
# TODO: ideas for further abstractions as per
# https://github.com/pikers/piker/issues/216 and
# https://github.com/pikers/piker/issues/270:
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
# as per circuit parlance:
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
# - could cover the combination of our `FspAdmin` and the
# backend `.fsp._engine` related machinery to "connect" one flume
# to another?
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
class Flume(Struct):
'''
Composite reference type which points to all the addressing handles
and other meta-data necessary for the read, measure and management
of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that can
be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport
- history and real-time shm buffers which are both real-time
updated and backfilled.
- associated startup indexing information related to both buffer
real-time-append and historical prepend addresses.
- low level APIs to read and measure the updated data and manage
queuing properties.
'''
symbol: Symbol
first_quote: dict
_rt_shm_token: _Token
# optional since some data flows won't have a "downsampled" history
# buffer/stream (eg. FSPs).
_hist_shm_token: _Token | None = None
# private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None
stream: tractor.MsgStream | None = None
izero_hist: int = 0
izero_rt: int = 0
throttle_rate: int | None = None
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
@property
def rt_shm(self) -> ShmArray:
if self._rt_shm is None:
self._rt_shm = attach_shm_array(
token=self._rt_shm_token,
readonly=True,
)
return self._rt_shm
@property
def hist_shm(self) -> ShmArray:
if self._hist_shm_token is None:
raise RuntimeError(
'No shm token has been set for the history buffer?'
)
if (
self._hist_shm is None
):
self._hist_shm = attach_shm_array(
token=self._hist_shm_token,
readonly=True,
)
return self._hist_shm
async def receive(self) -> dict:
return await self.stream.receive()
@acm
async def index_stream(
self,
delay_s: float = 1,
) -> AsyncIterator[int]:
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
assert portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with open_sample_stream(float(delay_s)) as stream:
yield stream
def get_ds_info(
self,
) -> tuple[float, float, float]:
'''
Compute the "downsampling" ratio info between the historical shm
buffer and the real-time (HFT) one.
Return a tuple of the fast sample period, historical sample
period and ratio between them.
'''
times = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds
times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
rt_step_size_s = (end - start).seconds
ratio = hist_step_size_s / rt_step_size_s
return (
rt_step_size_s,
hist_step_size_s,
ratio,
)
# TODO: get native msgspec decoding for these workinn
def to_msg(self) -> dict:
msg = self.to_dict()
msg['symbol'] = msg['symbol'].to_dict()
# can't serialize the stream or feed objects, it's expected
# you'll have a ref to it since this msg should be rxed on
# a stream on whatever far end IPC..
msg.pop('stream')
msg.pop('feed')
return msg
@classmethod
def from_msg(cls, msg: dict) -> dict:
symbol = Symbol(**msg.pop('symbol'))
return cls(
symbol=symbol,
**msg,
)
def get_index(
self,
time_s: float,
) -> int:
'''
Return array shm-buffer index for for epoch time.
'''
array = self.rt_shm.array
times = array['time']
mask = (times >= time_s)
if any(mask):
return array['index'][mask][0]
# just the latest index
array['index'][-1]
def slice_from_time(
self,
array: np.ndarray,
start_t: float,
stop_t: float,
timeframe_s: int = 1,
return_data: bool = False,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
arr = {
1: self.rt_shm.array,
60: self.hist_shm.arry,
}[timeframe_s]
times = arr['time']
index = array['index']
# use advanced indexing to map the
# time range to the index range.
mask = (
(times >= start_t)
&
(times < stop_t)
)
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
i_0 = i_by_t[0]
abs_slc = slice(
i_0,
i_by_t[-1],
)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
0,
i_by_t[-1] - i_0,
)
if not return_data:
return (
abs_slc,
read_slc,
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
arr[mask],
)
def view_data(
self,
plot: PlotItem,
timeframe_s: int = 1,
) -> np.ndarray:
# get far-side x-indices plot view
vr = plot.viewRect()
(
abs_slc,
buf_slc,
iv_arr,
) = self.slice_from_time(
start_t=vr.left(),
stop_t=vr.right(),
timeframe_s=timeframe_s,
return_data=True,
)
return iv_arr

View File

@ -454,8 +454,12 @@ class Storage:
try:
result = await client.query(params)
except purerpc.grpclib.exceptions.UnknownError:
except purerpc.grpclib.exceptions.UnknownError as err:
# indicate there is no history for this timeframe
log.exception(
f'Unknown mkts QUERY error: {params}\n'
f'{err.args}'
)
return {}
# TODO: it turns out column access on recarrays is actually slower:

View File

@ -199,7 +199,10 @@ def maybe_mk_fsp_shm(
# TODO: load output types from `Fsp`
# - should `index` be a required internal field?
fsp_dtype = np.dtype(
[('index', int)] +
[('index', int)]
+
[('time', float)]
+
[(field_name, float) for field_name in target.outputs]
)

View File

@ -21,7 +21,9 @@ core task logic for processing chains
from dataclasses import dataclass
from functools import partial
from typing import (
AsyncIterator, Callable, Optional,
AsyncIterator,
Callable,
Optional,
Union,
)
@ -36,9 +38,13 @@ from .. import data
from ..data import attach_shm_array
from ..data.feed import (
Flume,
Feed,
)
from ..data._sharedmem import ShmArray
from ..data._sampling import _default_delay_s
from ..data._sampling import (
_default_delay_s,
open_sample_stream,
)
from ..data._source import Symbol
from ._api import (
Fsp,
@ -111,8 +117,9 @@ async def fsp_compute(
flume.rt_shm,
)
# Conduct a single iteration of fsp with historical bars input
# and get historical output
# HISTORY COMPUTE PHASE
# conduct a single iteration of fsp with historical bars input
# and get historical output.
history_output: Union[
dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case
@ -129,9 +136,13 @@ async def fsp_compute(
# each respective field.
fields = getattr(dst.array.dtype, 'fields', None).copy()
fields.pop('index')
history: Optional[np.ndarray] = None # TODO: nptyping here!
history_by_field: Optional[np.ndarray] = None
src_time = src.array['time']
if fields and len(fields) > 1 and fields:
if (
fields and
len(fields) > 1
):
if not isinstance(history_output, dict):
raise ValueError(
f'`{func_name}` is a multi-output FSP and should yield a '
@ -142,7 +153,7 @@ async def fsp_compute(
if key in history_output:
output = history_output[key]
if history is None:
if history_by_field is None:
if output is None:
length = len(src.array)
@ -152,7 +163,7 @@ async def fsp_compute(
# using the first output, determine
# the length of the struct-array that
# will be pushed to shm.
history = np.zeros(
history_by_field = np.zeros(
length,
dtype=dst.array.dtype
)
@ -160,7 +171,7 @@ async def fsp_compute(
if output is None:
continue
history[key] = output
history_by_field[key] = output
# single-key output stream
else:
@ -169,11 +180,13 @@ async def fsp_compute(
f'`{func_name}` is a single output FSP and should yield an '
'`np.ndarray` for history'
)
history = np.zeros(
history_by_field = np.zeros(
len(history_output),
dtype=dst.array.dtype
)
history[func_name] = history_output
history_by_field[func_name] = history_output
history_by_field['time'] = src_time[-len(history_by_field):]
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
@ -190,7 +203,10 @@ async def fsp_compute(
# TODO: can we use this `start` flag instead of the manual
# setting above?
index = dst.push(history, start=first)
index = dst.push(
history_by_field,
start=first,
)
profiler(f'{func_name} pushed history')
profiler.finish()
@ -216,8 +232,14 @@ async def fsp_compute(
log.debug(f"{func_name}: {processed}")
key, output = processed
index = src.index
dst.array[-1][key] = output
# dst.array[-1][key] = output
dst.array[[key, 'time']][-1] = (
output,
# TODO: what about pushing ``time.time_ns()``
# in which case we'll need to round at the graphics
# processing / sampling layer?
src.array[-1]['time']
)
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
@ -228,6 +250,7 @@ async def fsp_compute(
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# index = src.index
# if attach_stream:
# await client_stream.send(index)
@ -302,6 +325,7 @@ async def cascade(
raise ValueError(f'Unknown fsp target: {ns_path}')
# open a data feed stream with requested broker
feed: Feed
async with data.feed.maybe_open_feed(
[fqsn],
@ -317,7 +341,6 @@ async def cascade(
symbol = flume.symbol
assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up')
# last_len = new_len = len(src.array)
func_name = func.__name__
async with (
@ -365,7 +388,7 @@ async def cascade(
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.debug(f're-syncing fsp {func_name} to source')
log.info(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
@ -386,7 +409,8 @@ async def cascade(
src: ShmArray,
dst: ShmArray
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
@ -395,16 +419,15 @@ async def cascade(
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
len_diff > 2
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
or step_diff > 1
or step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
@ -424,16 +447,16 @@ async def cascade(
# signal
times = src.array['time']
if len(times) > 1:
delay_s = times[-1] - times[times != times[-1]][-1]
last_ts = times[-1]
delay_s = float(last_ts - times[times != last_ts][-1])
else:
# our default "HFT" sample rate.
delay_s = _default_delay_s
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with flume.index_stream(
int(delay_s)
) as istream:
# sub and increment the underlying shared memory buffer
# on every step msg received from the global `samplerd`
# service.
async with open_sample_stream(float(delay_s)) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
@ -468,3 +491,23 @@ async def cascade(
last = array[-1:].copy()
dst.push(last)
# sync with source buffer's time step
src_l2 = src.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst._array['time'][src_li] = src_lt
dst._array['time'][src_2li] = src_2lt
# last2 = dst.array[-2:]
# if (
# last2[-1]['index'] != src_li
# or last2[-2]['index'] != src_2li
# ):
# dstl2 = list(last2)
# srcl2 = list(src_l2)
# print(
# # f'{dst.token}\n'
# f'src: {srcl2}\n'
# f'dst: {dstl2}\n'
# )

View File

@ -234,7 +234,7 @@ async def flow_rates(
# FSPs, user input, and possibly any general event stream in
# real-time. Hint: ideally implemented with caching until mutated
# ;)
period: 'Param[int]' = 6, # noqa
period: 'Param[int]' = 1, # noqa
# TODO: support other means by providing a map
# to weights `partial()`-ed with `wma()`?
@ -268,8 +268,7 @@ async def flow_rates(
'dark_dvlm_rate': None,
}
# TODO: 3.10 do ``anext()``
quote = await source.__anext__()
quote = await anext(source)
# ltr = 0
# lvr = 0

View File

@ -49,7 +49,10 @@ from qdarkstyle import DarkPalette
import trio
from outcome import Error
from .._daemon import maybe_open_pikerd, _tractor_kwargs
from .._daemon import (
maybe_open_pikerd,
get_tractor_runtime_kwargs,
)
from ..log import get_logger
from ._pg_overrides import _do_overrides
from . import _style
@ -170,7 +173,7 @@ def run_qtractor(
instance.window = window
# override tractor's defaults
tractor_kwargs.update(_tractor_kwargs)
tractor_kwargs.update(get_tractor_runtime_kwargs())
# define tractor entrypoint
async def main():

View File

@ -51,7 +51,10 @@ from ._forms import (
mk_form,
open_form_input_handling,
)
from ..fsp._api import maybe_mk_fsp_shm, Fsp
from ..fsp._api import (
maybe_mk_fsp_shm,
Fsp,
)
from ..fsp import cascade
from ..fsp._volume import (
# tina_vwap,

View File

@ -7,6 +7,9 @@ from piker import (
# log,
config,
)
from piker._daemon import (
Services,
)
def pytest_addoption(parser):
@ -137,12 +140,16 @@ async def _open_test_pikerd(
port = random.randint(6e3, 7e3)
reg_addr = ('127.0.0.1', port)
# try:
async with (
maybe_open_pikerd(
registry_addr=reg_addr,
**kwargs,
),
) as service_manager,
):
# this proc/actor is the pikerd
assert service_manager is Services
async with tractor.wait_for_actor(
'pikerd',
arbiter_sockaddr=reg_addr,
@ -153,6 +160,7 @@ async def _open_test_pikerd(
raddr[0],
raddr[1],
portal,
service_manager,
)

View File

@ -0,0 +1,176 @@
'''
Actor tree daemon sub-service verifications
'''
from typing import AsyncContextManager
from contextlib import asynccontextmanager as acm
import pytest
import trio
import tractor
from piker._daemon import (
find_service,
check_for_service,
Services,
)
from piker.data import (
open_feed,
)
from piker.clearing import (
open_ems,
)
from piker.clearing._messages import (
BrokerdPosition,
Status,
)
from piker.clearing._client import (
OrderBook,
)
def test_runtime_boot(
open_test_pikerd: AsyncContextManager
):
'''
Verify we can boot the `pikerd` service stack using the
`open_test_pikerd` fixture helper and that registry address details
match up.
'''
async def main():
port = 6666
daemon_addr = ('127.0.0.1', port)
services: Services
async with (
open_test_pikerd(
reg_addr=daemon_addr,
) as (_, _, pikerd_portal, services),
tractor.wait_for_actor(
'pikerd',
arbiter_sockaddr=daemon_addr,
) as portal,
):
assert pikerd_portal.channel.raddr == daemon_addr
assert pikerd_portal.channel.raddr == portal.channel.raddr
trio.run(main)
@acm
async def ensure_service(
name: str,
sockaddr: tuple[str, int] | None = None,
) -> None:
async with find_service(name) as portal:
remote_sockaddr = portal.channel.raddr
print(f'FOUND `{name}` @ {remote_sockaddr}')
if sockaddr:
assert remote_sockaddr == sockaddr
yield portal
def test_ensure_datafeed_actors(
open_test_pikerd: AsyncContextManager
) -> None:
'''
Verify that booting a data feed starts a `brokerd`
actor and a singleton global `samplerd` and opening
an order mode in paper opens the `paperboi` service.
'''
actor_name: str = 'brokerd'
backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}'
async def main():
async with (
open_test_pikerd(),
open_feed(
['xbtusdt.kraken'],
loglevel='info',
) as feed
):
# halt rt quote streams since we aren't testing them
await feed.pause()
async with (
ensure_service(brokerd_name),
ensure_service('samplerd'),
):
pass
trio.run(main)
def test_ensure_ems_in_paper_actors(
open_test_pikerd: AsyncContextManager
) -> None:
actor_name: str = 'brokerd'
backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}'
async def main():
# type declares
book: OrderBook
trades_stream: tractor.MsgStream
pps: dict[str, list[BrokerdPosition]]
accounts: list[str]
dialogs: dict[str, Status]
# ensure we timeout after is startup is too slow.
# TODO: something like this should be our start point for
# benchmarking end-to-end startup B)
with trio.fail_after(9):
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(
'xbtusdt.kraken',
mode='paper',
) as (
book,
trades_stream,
pps,
accounts,
dialogs,
),
):
# there should be no on-going positions,
# TODO: though eventually we'll want to validate against
# local ledger and `pps.toml` state ;)
assert not pps
assert not dialogs
pikerd_subservices = ['emsd', 'samplerd']
async with (
ensure_service('emsd'),
ensure_service(brokerd_name),
ensure_service(f'paperboi.{backend}'),
):
for name in pikerd_subservices:
assert name in services.service_tasks
# brokerd.kraken actor should have been started
# implicitly by the ems.
assert brokerd_name in services.service_tasks
print('ALL SERVICES STARTED, terminating..')
await services.cancel_service('emsd')
with pytest.raises(
tractor._exceptions.ContextCancelled,
) as exc_info:
trio.run(main)
cancel_msg: str = '`_emsd_main()` was remotely cancelled by its caller'
assert cancel_msg in exc_info.value.args[0]