Compare commits
21 Commits
90bdac5f72
...
01cbd0a775
Author | SHA1 | Date |
---|---|---|
|
01cbd0a775 | |
|
8ab2feba3e | |
|
91d7db9db8 | |
|
7f38e86b16 | |
|
621a8b829c | |
|
903c739b06 | |
|
badcc60d60 | |
|
4c9915a4a5 | |
|
636a580645 | |
|
389c746223 | |
|
a795108041 | |
|
6e495511ff | |
|
e391c896f8 | |
|
5633f5614d | |
|
76735189de | |
|
d49608f74e | |
|
bf0ac93aa3 | |
|
d7179d47b0 | |
|
c390e87536 | |
|
5e4a6d61c7 | |
|
3caaa30b03 |
|
@ -23,6 +23,7 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
|
@ -60,12 +61,13 @@ async def _setup_persistent_brokerd(
|
|||
ctx: tractor.Context,
|
||||
brokername: str,
|
||||
loglevel: str | None = None,
|
||||
debug_mode: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Allocate a actor-wide service nursery in ``brokerd``
|
||||
such that feeds can be run in the background persistently by
|
||||
the broker backend as needed.
|
||||
Allocate a actor-wide service nursery in `brokerd` such that
|
||||
feeds can be run in the background persistently by the broker
|
||||
backend as needed.
|
||||
|
||||
'''
|
||||
# NOTE: we only need to setup logging once (and only) here
|
||||
|
@ -86,6 +88,18 @@ async def _setup_persistent_brokerd(
|
|||
from piker.data import feed
|
||||
assert not feed._bus
|
||||
|
||||
if (
|
||||
debug_mode
|
||||
and
|
||||
tractor.current_actor().is_infected_aio()
|
||||
):
|
||||
# NOTE, whenever running `asyncio` in provider's actor
|
||||
# runtime be sure we enabled `breakpoint()` support
|
||||
# for non-`trio.Task` usage.
|
||||
from tractor.devx._debug import maybe_init_greenback
|
||||
await maybe_init_greenback()
|
||||
# breakpoint() # XXX, SHOULD WORK from `trio.Task`!
|
||||
|
||||
# allocate a nursery to the bus for spawning background
|
||||
# tasks to service client IPC requests, normally
|
||||
# `tractor.Context` connections to explicitly required
|
||||
|
@ -145,18 +159,21 @@ def broker_init(
|
|||
above.
|
||||
|
||||
'''
|
||||
from ..brokers import get_brokermod
|
||||
brokermod = get_brokermod(brokername)
|
||||
brokermod: ModuleType = get_brokermod(brokername)
|
||||
modpath: str = brokermod.__name__
|
||||
|
||||
start_actor_kwargs['name'] = f'brokerd.{brokername}'
|
||||
start_actor_kwargs.update(
|
||||
getattr(
|
||||
brokermod,
|
||||
'_spawn_kwargs',
|
||||
{},
|
||||
)
|
||||
spawn_kws: dict = getattr(
|
||||
brokermod,
|
||||
'_spawn_kwargs',
|
||||
{},
|
||||
)
|
||||
# ^^ NOTE, here we pull any runtime parameters specific
|
||||
# to spawning the sub-actor for the backend. For ex.
|
||||
# both `ib` and `deribit` rely on,
|
||||
# `'infect_asyncio': True,` since they both
|
||||
# use `tractor`'s "infected `asyncio` mode"
|
||||
# for their libs but you could also do something like
|
||||
# `'debug_mode: True` which would be like passing
|
||||
# `--pdb` for just that provider backend.
|
||||
|
||||
# XXX TODO: make this not so hacky/monkeypatched..
|
||||
# -> we need a sane way to configure the logging level for all
|
||||
|
@ -166,8 +183,7 @@ def broker_init(
|
|||
|
||||
# lookup actor-enabled modules declared by the backend offering the
|
||||
# `brokerd` endpoint(s).
|
||||
enabled: list[str]
|
||||
enabled = start_actor_kwargs['enable_modules'] = [
|
||||
enabled: list[str] = [
|
||||
__name__, # so that eps from THIS mod can be invoked
|
||||
modpath,
|
||||
]
|
||||
|
@ -179,9 +195,13 @@ def broker_init(
|
|||
subpath: str = f'{modpath}.{submodname}'
|
||||
enabled.append(subpath)
|
||||
|
||||
datad_kwargs: dict = {
|
||||
'name': f'brokerd.{brokername}',
|
||||
'enable_modules': enabled,
|
||||
}
|
||||
return (
|
||||
brokermod,
|
||||
start_actor_kwargs, # to `ActorNursery.start_actor()`
|
||||
start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()`
|
||||
|
||||
# XXX see impl above; contains all (actor global)
|
||||
# setup/teardown expected in all `brokerd` actor instances.
|
||||
|
@ -190,14 +210,17 @@ def broker_init(
|
|||
|
||||
|
||||
async def spawn_brokerd(
|
||||
|
||||
brokername: str,
|
||||
loglevel: str | None = None,
|
||||
|
||||
**tractor_kwargs,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Spawn a `brokerd.<backendname>` subactor service daemon
|
||||
using `pikerd`'s service mngr.
|
||||
|
||||
'''
|
||||
from piker.service._util import log # use service mngr log
|
||||
log.info(f'Spawning {brokername} broker daemon')
|
||||
|
||||
|
@ -211,33 +234,41 @@ async def spawn_brokerd(
|
|||
**tractor_kwargs,
|
||||
)
|
||||
|
||||
brokermod = get_brokermod(brokername)
|
||||
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
|
||||
tractor_kwargs.update(extra_tractor_kwargs)
|
||||
|
||||
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||
# actor nursery
|
||||
from piker.service import Services
|
||||
|
||||
from piker.service import (
|
||||
get_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
|
||||
debug_mode=Services.debug_mode,
|
||||
mngr: ServiceMngr = get_service_mngr()
|
||||
ctx: tractor.Context = await mngr.start_service(
|
||||
daemon_name=dname,
|
||||
ctx_ep=partial(
|
||||
# signature of target root-task endpoint
|
||||
daemon_fixture_ep,
|
||||
|
||||
# passed to daemon_fixture_ep(**kwargs)
|
||||
brokername=brokername,
|
||||
loglevel=loglevel,
|
||||
debug_mode=mngr.debug_mode,
|
||||
),
|
||||
debug_mode=mngr.debug_mode,
|
||||
# ^TODO, allow overriding this per-daemon from client side?
|
||||
# |_ it's already supported in `tractor` so..
|
||||
|
||||
loglevel=loglevel,
|
||||
enable_modules=(
|
||||
_data_mods
|
||||
+
|
||||
tractor_kwargs.pop('enable_modules')
|
||||
),
|
||||
**tractor_kwargs
|
||||
)
|
||||
|
||||
# NOTE: the service mngr expects an already spawned actor + its
|
||||
# portal ref in order to do non-blocking setup of brokerd
|
||||
# service nursery.
|
||||
await Services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
|
||||
# signature of target root-task endpoint
|
||||
daemon_fixture_ep,
|
||||
brokername=brokername,
|
||||
loglevel=loglevel,
|
||||
assert (
|
||||
not ctx.cancel_called
|
||||
and ctx.portal # parent side
|
||||
and dname in ctx.chan.uid # subactor is named as desired
|
||||
)
|
||||
return True
|
||||
|
||||
|
@ -262,8 +293,7 @@ async def maybe_spawn_brokerd(
|
|||
from piker.service import maybe_spawn_daemon
|
||||
|
||||
async with maybe_spawn_daemon(
|
||||
|
||||
f'brokerd.{brokername}',
|
||||
service_name=f'brokerd.{brokername}',
|
||||
service_task_target=spawn_brokerd,
|
||||
spawn_args={
|
||||
'brokername': brokername,
|
||||
|
|
|
@ -140,11 +140,10 @@ def pikerd(
|
|||
|
||||
if pdb:
|
||||
log.warning((
|
||||
"\n"
|
||||
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
|
||||
"When a `piker` daemon crashes it will block the "
|
||||
"task-thread until resumed from console!\n"
|
||||
"\n"
|
||||
'\n\n'
|
||||
'!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n'
|
||||
'When a `piker` daemon crashes it will block the '
|
||||
'task-thread until resumed from console!\n'
|
||||
))
|
||||
|
||||
# service-actor registry endpoint socket-address set
|
||||
|
@ -177,7 +176,7 @@ def pikerd(
|
|||
from .. import service
|
||||
|
||||
async def main():
|
||||
service_mngr: service.Services
|
||||
service_mngr: service.ServiceMngr
|
||||
|
||||
async with (
|
||||
service.open_pikerd(
|
||||
|
@ -335,7 +334,7 @@ def services(config, tl, ports):
|
|||
name='service_query',
|
||||
loglevel=config['loglevel'] if tl else None,
|
||||
),
|
||||
tractor.get_arbiter(
|
||||
tractor.get_registry(
|
||||
host=host,
|
||||
port=ports[0]
|
||||
) as portal
|
||||
|
|
|
@ -25,6 +25,7 @@ from collections import (
|
|||
defaultdict,
|
||||
)
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from functools import partial
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
|
@ -42,7 +43,7 @@ from tractor.trionics import (
|
|||
maybe_open_nursery,
|
||||
)
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from trio import TaskStatus
|
||||
|
||||
from .ticktools import (
|
||||
frame_ticks,
|
||||
|
@ -70,6 +71,7 @@ if TYPE_CHECKING:
|
|||
_default_delay_s: float = 1.0
|
||||
|
||||
|
||||
# TODO: use new `tractor.singleton_acm` API for this!
|
||||
class Sampler:
|
||||
'''
|
||||
Global sampling engine registry.
|
||||
|
@ -79,9 +81,9 @@ class Sampler:
|
|||
|
||||
This non-instantiated type is meant to be a singleton within
|
||||
a `samplerd` actor-service spawned once by the user wishing to
|
||||
time-step-sample (real-time) quote feeds, see
|
||||
``.service.maybe_open_samplerd()`` and the below
|
||||
``register_with_sampler()``.
|
||||
time-step-sample a (real-time) quote feeds, see
|
||||
`.service.maybe_open_samplerd()` and the below
|
||||
`register_with_sampler()`.
|
||||
|
||||
'''
|
||||
service_nursery: None | trio.Nursery = None
|
||||
|
@ -95,6 +97,12 @@ class Sampler:
|
|||
# history loading.
|
||||
incr_task_cs: trio.CancelScope | None = None
|
||||
|
||||
bcast_errors: tuple[Exception] = (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
)
|
||||
|
||||
# holds all the ``tractor.Context`` remote subscriptions for
|
||||
# a particular sample period increment event: all subscribers are
|
||||
# notified on a step.
|
||||
|
@ -258,14 +266,15 @@ class Sampler:
|
|||
subs: set
|
||||
last_ts, subs = pair
|
||||
|
||||
task = trio.lowlevel.current_task()
|
||||
log.debug(
|
||||
f'SUBS {self.subscribers}\n'
|
||||
f'PAIR {pair}\n'
|
||||
f'TASK: {task}: {id(task)}\n'
|
||||
f'broadcasting {period_s} -> {last_ts}\n'
|
||||
# f'consumers: {subs}'
|
||||
)
|
||||
# NOTE, for debugging pub-sub issues
|
||||
# task = trio.lowlevel.current_task()
|
||||
# log.debug(
|
||||
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
|
||||
# f'PAIR: {pair}\n'
|
||||
# f'TASK: {task}: {id(task)}\n'
|
||||
# f'broadcasting {period_s} -> {last_ts}\n'
|
||||
# f'consumers: {subs}'
|
||||
# )
|
||||
borked: set[MsgStream] = set()
|
||||
sent: set[MsgStream] = set()
|
||||
while True:
|
||||
|
@ -282,12 +291,11 @@ class Sampler:
|
|||
await stream.send(msg)
|
||||
sent.add(stream)
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
except self.bcast_errors as err:
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
f'Connection dropped for IPC ctx\n'
|
||||
f'{stream._ctx}\n\n'
|
||||
f'Due to {type(err)}'
|
||||
)
|
||||
borked.add(stream)
|
||||
else:
|
||||
|
@ -375,7 +383,10 @@ async def register_with_sampler(
|
|||
assert Sampler.ohlcv_shms
|
||||
|
||||
# unblock caller
|
||||
await ctx.started(set(Sampler.ohlcv_shms.keys()))
|
||||
await ctx.started(
|
||||
# XXX bc msgpack only allows one array type!
|
||||
list(Sampler.ohlcv_shms.keys())
|
||||
)
|
||||
|
||||
if open_index_stream:
|
||||
try:
|
||||
|
@ -394,7 +405,8 @@ async def register_with_sampler(
|
|||
finally:
|
||||
if (
|
||||
sub_for_broadcasts
|
||||
and subs
|
||||
and
|
||||
subs
|
||||
):
|
||||
try:
|
||||
subs.remove(stream)
|
||||
|
@ -419,7 +431,6 @@ async def register_with_sampler(
|
|||
|
||||
|
||||
async def spawn_samplerd(
|
||||
|
||||
loglevel: str | None = None,
|
||||
**extra_tractor_kwargs
|
||||
|
||||
|
@ -429,7 +440,10 @@ async def spawn_samplerd(
|
|||
update and increment count write and stream broadcasting.
|
||||
|
||||
'''
|
||||
from piker.service import Services
|
||||
from piker.service import (
|
||||
get_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
|
||||
dname = 'samplerd'
|
||||
log.info(f'Spawning `{dname}`')
|
||||
|
@ -437,26 +451,33 @@ async def spawn_samplerd(
|
|||
# singleton lock creation of ``samplerd`` since we only ever want
|
||||
# one daemon per ``pikerd`` proc tree.
|
||||
# TODO: make this built-into the service api?
|
||||
async with Services.locks[dname + '_singleton']:
|
||||
mngr: ServiceMngr = get_service_mngr()
|
||||
already_started: bool = dname in mngr.service_tasks
|
||||
|
||||
if dname not in Services.service_tasks:
|
||||
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=[
|
||||
'piker.data._sampling',
|
||||
],
|
||||
loglevel=loglevel,
|
||||
debug_mode=Services.debug_mode, # set by pikerd flag
|
||||
**extra_tractor_kwargs
|
||||
)
|
||||
|
||||
await Services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
async with mngr._locks[dname + '_singleton']:
|
||||
ctx: Context = await mngr.start_service(
|
||||
daemon_name=dname,
|
||||
ctx_ep=partial(
|
||||
register_with_sampler,
|
||||
period_s=1,
|
||||
sub_for_broadcasts=False,
|
||||
),
|
||||
debug_mode=mngr.debug_mode, # set by pikerd flag
|
||||
|
||||
# proxy-through to tractor
|
||||
enable_modules=[
|
||||
'piker.data._sampling',
|
||||
],
|
||||
loglevel=loglevel,
|
||||
**extra_tractor_kwargs
|
||||
)
|
||||
if not already_started:
|
||||
assert (
|
||||
ctx
|
||||
and
|
||||
ctx.portal
|
||||
and
|
||||
not ctx.cancel_called
|
||||
)
|
||||
return True
|
||||
|
||||
|
@ -561,8 +582,7 @@ async def open_sample_stream(
|
|||
|
||||
|
||||
async def sample_and_broadcast(
|
||||
|
||||
bus: _FeedsBus, # noqa
|
||||
bus: _FeedsBus,
|
||||
rt_shm: ShmArray,
|
||||
hist_shm: ShmArray,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
|
@ -582,11 +602,33 @@ async def sample_and_broadcast(
|
|||
|
||||
overruns = Counter()
|
||||
|
||||
# NOTE, only used for debugging live-data-feed issues, though
|
||||
# this should be resolved more correctly in the future using the
|
||||
# new typed-msgspec feats of `tractor`!
|
||||
#
|
||||
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
|
||||
# are just that).
|
||||
# pfmt: Callable[[str], str] = mk_repr()
|
||||
|
||||
# iterate stream delivered by broker
|
||||
async for quotes in quote_stream:
|
||||
# print(quotes)
|
||||
|
||||
# TODO: ``numba`` this!
|
||||
# XXX WARNING XXX only enable for debugging bc ow can cost
|
||||
# ALOT of perf with HF-feedz!!!
|
||||
#
|
||||
# log.info(
|
||||
# 'Rx live quotes:\n'
|
||||
# f'{pfmt(quotes)}'
|
||||
# )
|
||||
|
||||
# TODO,
|
||||
# -[ ] `numba` or `cython`-nize this loop possibly?
|
||||
# |_alternatively could we do it in rust somehow by upacking
|
||||
# arrow msgs instead of using `msgspec`?
|
||||
# -[ ] use `msgspec.Struct` support in new typed-msging from
|
||||
# `tractor` to ensure only allowed msgs are transmitted?
|
||||
#
|
||||
for broker_symbol, quote in quotes.items():
|
||||
# TODO: in theory you can send the IPC msg *before* writing
|
||||
# to the sharedmem array to decrease latency, however, that
|
||||
|
@ -659,6 +701,21 @@ async def sample_and_broadcast(
|
|||
sub_key: str = broker_symbol.lower()
|
||||
subs: set[Sub] = bus.get_subs(sub_key)
|
||||
|
||||
# TODO, figure out how to make this useful whilst
|
||||
# incoporating feed "pausing" ..
|
||||
#
|
||||
# if not subs:
|
||||
# all_bs_fqmes: list[str] = list(
|
||||
# bus._subscribers.keys()
|
||||
# )
|
||||
# log.warning(
|
||||
# f'No subscribers for {brokername!r} live-quote ??\n'
|
||||
# f'broker_symbol: {broker_symbol}\n\n'
|
||||
|
||||
# f'Maybe the backend-sys symbol does not match one of,\n'
|
||||
# f'{pfmt(all_bs_fqmes)}\n'
|
||||
# )
|
||||
|
||||
# NOTE: by default the broker backend doesn't append
|
||||
# it's own "name" into the fqme schema (but maybe it
|
||||
# should?) so we have to manually generate the correct
|
||||
|
@ -728,18 +785,14 @@ async def sample_and_broadcast(
|
|||
if lags > 10:
|
||||
await tractor.pause()
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
except Sampler.bcast_errors as ipc_err:
|
||||
ctx: Context = ipc._ctx
|
||||
chan: Channel = ctx.chan
|
||||
if ctx:
|
||||
log.warning(
|
||||
'Dropped `brokerd`-quotes-feed connection:\n'
|
||||
f'{broker_symbol}:'
|
||||
f'{ctx.cid}@{chan.uid}'
|
||||
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
|
||||
f'x>) {ctx.cid}@{chan.uid}'
|
||||
f'|_{ipc_err!r}\n\n'
|
||||
)
|
||||
if sub.throttle_rate:
|
||||
assert ipc._closed
|
||||
|
@ -756,12 +809,11 @@ async def sample_and_broadcast(
|
|||
|
||||
|
||||
async def uniform_rate_send(
|
||||
|
||||
rate: float,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
stream: MsgStream,
|
||||
|
||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -779,13 +831,16 @@ async def uniform_rate_send(
|
|||
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
|
||||
|
||||
'''
|
||||
# TODO: compute the approx overhead latency per cycle
|
||||
left_to_sleep = throttle_period = 1/rate - 0.000616
|
||||
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
|
||||
# instead of this magic # bidinezz?
|
||||
throttle_period: float = 1/rate - 0.000616
|
||||
left_to_sleep: float = throttle_period
|
||||
|
||||
# send cycle state
|
||||
first_quote: dict|None
|
||||
first_quote = last_quote = None
|
||||
last_send = time.time()
|
||||
diff = 0
|
||||
last_send: float = time.time()
|
||||
diff: float = 0
|
||||
|
||||
task_status.started()
|
||||
ticks_by_type: dict[
|
||||
|
@ -796,22 +851,28 @@ async def uniform_rate_send(
|
|||
clear_types = _tick_groups['clears']
|
||||
|
||||
while True:
|
||||
|
||||
# compute the remaining time to sleep for this throttled cycle
|
||||
left_to_sleep = throttle_period - diff
|
||||
left_to_sleep: float = throttle_period - diff
|
||||
|
||||
if left_to_sleep > 0:
|
||||
cs: trio.CancelScope
|
||||
with trio.move_on_after(left_to_sleep) as cs:
|
||||
sym: str
|
||||
last_quote: dict
|
||||
try:
|
||||
sym, last_quote = await quote_stream.receive()
|
||||
except trio.EndOfChannel:
|
||||
log.exception(f"feed for {stream} ended?")
|
||||
log.exception(
|
||||
f'Live stream for feed for ended?\n'
|
||||
f'<=c\n'
|
||||
f' |_[{stream!r}\n'
|
||||
)
|
||||
break
|
||||
|
||||
diff = time.time() - last_send
|
||||
diff: float = time.time() - last_send
|
||||
|
||||
if not first_quote:
|
||||
first_quote = last_quote
|
||||
first_quote: float = last_quote
|
||||
# first_quote['tbt'] = ticks_by_type
|
||||
|
||||
if (throttle_period - diff) > 0:
|
||||
|
@ -872,7 +933,9 @@ async def uniform_rate_send(
|
|||
# TODO: now if only we could sync this to the display
|
||||
# rate timing exactly lul
|
||||
try:
|
||||
await stream.send({sym: first_quote})
|
||||
await stream.send({
|
||||
sym: first_quote
|
||||
})
|
||||
except tractor.RemoteActorError as rme:
|
||||
if rme.type is not tractor._exceptions.StreamOverrun:
|
||||
raise
|
||||
|
@ -883,19 +946,28 @@ async def uniform_rate_send(
|
|||
f'{sym}:{ctx.cid}@{chan.uid}'
|
||||
)
|
||||
|
||||
# NOTE: any of these can be raised by `tractor`'s IPC
|
||||
# transport-layer and we want to be highly resilient
|
||||
# to consumers which crash or lose network connection.
|
||||
# I.e. we **DO NOT** want to crash and propagate up to
|
||||
# ``pikerd`` these kinds of errors!
|
||||
except (
|
||||
# NOTE: any of these can be raised by ``tractor``'s IPC
|
||||
# transport-layer and we want to be highly resilient
|
||||
# to consumers which crash or lose network connection.
|
||||
# I.e. we **DO NOT** want to crash and propagate up to
|
||||
# ``pikerd`` these kinds of errors!
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
ConnectionResetError,
|
||||
):
|
||||
# if the feed consumer goes down then drop
|
||||
# out of this rate limiter
|
||||
log.warning(f'{stream} closed')
|
||||
) + Sampler.bcast_errors as ipc_err:
|
||||
match ipc_err:
|
||||
case trio.EndOfChannel():
|
||||
log.info(
|
||||
f'{stream} terminated by peer,\n'
|
||||
f'{ipc_err!r}'
|
||||
)
|
||||
case _:
|
||||
# if the feed consumer goes down then drop
|
||||
# out of this rate limiter
|
||||
log.warning(
|
||||
f'{stream} closed due to,\n'
|
||||
f'{ipc_err!r}'
|
||||
)
|
||||
|
||||
await stream.aclose()
|
||||
return
|
||||
|
||||
|
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
|||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
log.exception(f'Retrying connection')
|
||||
log.exception('Retrying connection')
|
||||
|
||||
# ws & nursery block ends
|
||||
|
||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
|||
|
||||
|
||||
'''
|
||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
||||
over a NoBsWs.
|
||||
JSONRPC response-request style machinery for transparent multiplexing
|
||||
of msgs over a `NoBsWs`.
|
||||
|
||||
'''
|
||||
|
||||
|
@ -377,43 +377,82 @@ async def open_jsonrpc_session(
|
|||
url: str,
|
||||
start_id: int = 0,
|
||||
response_type: type = JSONRPCResult,
|
||||
request_type: Optional[type] = None,
|
||||
request_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None,
|
||||
msg_recv_timeout: float = float('inf'),
|
||||
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
||||
# and options mkts are generally "slow moving"..
|
||||
#
|
||||
# FURTHER if we break the underlying ws connection then since we
|
||||
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
|
||||
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
|
||||
# broken and never restored with wtv init sequence is required to
|
||||
# re-establish a working req-resp session.
|
||||
|
||||
) -> Callable[[str, dict], dict]:
|
||||
'''
|
||||
Init a json-RPC-over-websocket connection to the provided `url`.
|
||||
|
||||
A `json_rpc: Callable[[str, dict], dict` is delivered to the
|
||||
caller for sending requests and a bg-`trio.Task` handles
|
||||
processing of response msgs including error reporting/raising in
|
||||
the parent/caller task.
|
||||
|
||||
'''
|
||||
# NOTE, store all request msgs so we can raise errors on the
|
||||
# caller side!
|
||||
req_msgs: dict[int, dict] = {}
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(url) as ws
|
||||
trio.open_nursery() as tn,
|
||||
open_autorecon_ws(
|
||||
url=url,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
) as ws
|
||||
):
|
||||
rpc_id: Iterable = count(start_id)
|
||||
rpc_id: Iterable[int] = count(start_id)
|
||||
rpc_results: dict[int, dict] = {}
|
||||
|
||||
async def json_rpc(method: str, params: dict) -> dict:
|
||||
async def json_rpc(
|
||||
method: str,
|
||||
params: dict,
|
||||
) -> dict:
|
||||
'''
|
||||
perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
|
||||
req_id: int = next(rpc_id)
|
||||
msg = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': next(rpc_id),
|
||||
'id': req_id,
|
||||
'method': method,
|
||||
'params': params
|
||||
}
|
||||
_id = msg['id']
|
||||
|
||||
rpc_results[_id] = {
|
||||
result = rpc_results[_id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
'error': None,
|
||||
'event': trio.Event(), # signal caller resp arrived
|
||||
}
|
||||
req_msgs[_id] = msg
|
||||
|
||||
await ws.send_msg(msg)
|
||||
|
||||
# wait for reponse before unblocking requester code
|
||||
await rpc_results[_id]['event'].wait()
|
||||
|
||||
ret = rpc_results[_id]['result']
|
||||
if (maybe_result := result['result']):
|
||||
ret = maybe_result
|
||||
del rpc_results[_id]
|
||||
|
||||
del rpc_results[_id]
|
||||
else:
|
||||
err = result['error']
|
||||
raise Exception(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {msg}\n'
|
||||
f'resp: {err}\n'
|
||||
)
|
||||
|
||||
if ret.error is not None:
|
||||
raise Exception(json.dumps(ret.error, indent=4))
|
||||
|
@ -428,6 +467,7 @@ async def open_jsonrpc_session(
|
|||
the server side.
|
||||
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
async for msg in ws:
|
||||
match msg:
|
||||
case {
|
||||
|
@ -451,19 +491,28 @@ async def open_jsonrpc_session(
|
|||
'params': _,
|
||||
}:
|
||||
log.debug(f'Recieved\n{msg}')
|
||||
if request_hook:
|
||||
await request_hook(request_type(**msg))
|
||||
|
||||
case {
|
||||
'error': error
|
||||
}:
|
||||
log.warning(f'Recieved\n{error}')
|
||||
if error_hook:
|
||||
await error_hook(response_type(**msg))
|
||||
# retreive orig request msg, set error
|
||||
# response in original "result" msg,
|
||||
# THEN FINALLY set the event to signal caller
|
||||
# to raise the error in the parent task.
|
||||
req_id: int = error['id']
|
||||
req_msg: dict = req_msgs[req_id]
|
||||
result: dict = rpc_results[req_id]
|
||||
result['error'] = error
|
||||
result['event'].set()
|
||||
log.error(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {req_msg}\n'
|
||||
f'resp: {error}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||
|
||||
n.start_soon(recv_task)
|
||||
tn.start_soon(recv_task)
|
||||
yield json_rpc
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
30
piker/log.py
30
piker/log.py
|
@ -19,6 +19,10 @@ Log like a forester!
|
|||
"""
|
||||
import logging
|
||||
import json
|
||||
import reprlib
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import tractor
|
||||
from pygments import (
|
||||
|
@ -84,3 +88,29 @@ def colorize_json(
|
|||
# likeable styles: algol_nu, tango, monokai
|
||||
formatters.TerminalTrueColorFormatter(style=style)
|
||||
)
|
||||
|
||||
|
||||
# TODO, eventually defer to the version in `modden` once
|
||||
# it becomes a dep!
|
||||
def mk_repr(
|
||||
**repr_kws,
|
||||
) -> Callable[[str], str]:
|
||||
'''
|
||||
Allocate and deliver a `repr.Repr` instance with provided input
|
||||
settings using the std-lib's `reprlib` mod,
|
||||
* https://docs.python.org/3/library/reprlib.html
|
||||
|
||||
------ Ex. ------
|
||||
An up to 6-layer-nested `dict` as multi-line:
|
||||
- https://stackoverflow.com/a/79102479
|
||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
||||
|
||||
'''
|
||||
def_kws: dict[str, int] = dict(
|
||||
indent=2,
|
||||
maxlevel=6, # recursion levels
|
||||
maxstring=66, # match editor line-len limit
|
||||
)
|
||||
def_kws |= repr_kws
|
||||
reprr = reprlib.Repr(**def_kws)
|
||||
return reprr.repr
|
||||
|
|
|
@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
|
|||
=> TODO: maybe to (re)move elsewhere?
|
||||
|
||||
'''
|
||||
from ._mngr import Services as Services
|
||||
from ._mngr import (
|
||||
get_service_mngr as get_service_mngr,
|
||||
open_service_mngr as open_service_mngr,
|
||||
ServiceMngr as ServiceMngr,
|
||||
)
|
||||
from ._registry import (
|
||||
_tractor_kwargs as _tractor_kwargs,
|
||||
_default_reg_addr as _default_reg_addr,
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
from __future__ import annotations
|
||||
import os
|
||||
from typing import (
|
||||
Optional,
|
||||
Any,
|
||||
ClassVar,
|
||||
)
|
||||
|
@ -30,13 +29,13 @@ from contextlib import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from ._util import (
|
||||
get_console_log,
|
||||
)
|
||||
from ._mngr import (
|
||||
Services,
|
||||
open_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
from ._registry import ( # noqa
|
||||
_tractor_kwargs,
|
||||
|
@ -59,7 +58,7 @@ async def open_piker_runtime(
|
|||
registry_addrs: list[tuple[str, int]] = [],
|
||||
|
||||
enable_modules: list[str] = [],
|
||||
loglevel: Optional[str] = None,
|
||||
loglevel: str|None = None,
|
||||
|
||||
# XXX NOTE XXX: you should pretty much never want debug mode
|
||||
# for data daemons when running in production.
|
||||
|
@ -69,7 +68,7 @@ async def open_piker_runtime(
|
|||
# and spawn the service tree distributed per that.
|
||||
start_method: str = 'trio',
|
||||
|
||||
tractor_runtime_overrides: dict | None = None,
|
||||
tractor_runtime_overrides: dict|None = None,
|
||||
**tractor_kwargs,
|
||||
|
||||
) -> tuple[
|
||||
|
@ -119,6 +118,10 @@ async def open_piker_runtime(
|
|||
# spawn other specialized daemons I think?
|
||||
enable_modules=enable_modules,
|
||||
|
||||
# TODO: how to configure this?
|
||||
# keep it on by default if debug mode is set?
|
||||
# maybe_enable_greenback=debug_mode,
|
||||
|
||||
**tractor_kwargs,
|
||||
) as actor,
|
||||
|
||||
|
@ -167,12 +170,13 @@ async def open_pikerd(
|
|||
|
||||
**kwargs,
|
||||
|
||||
) -> Services:
|
||||
) -> ServiceMngr:
|
||||
'''
|
||||
Start a root piker daemon with an indefinite lifetime.
|
||||
Start a root piker daemon actor (aka `pikerd`) with an indefinite
|
||||
lifetime.
|
||||
|
||||
A root actor nursery is created which can be used to create and keep
|
||||
alive underling services (see below).
|
||||
A root actor-nursery is created which can be used to spawn and
|
||||
supervise underling service sub-actors (see below).
|
||||
|
||||
'''
|
||||
# NOTE: for the root daemon we always enable the root
|
||||
|
@ -199,8 +203,6 @@ async def open_pikerd(
|
|||
root_actor,
|
||||
reg_addrs,
|
||||
),
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
trio.open_nursery() as service_nursery,
|
||||
):
|
||||
for addr in reg_addrs:
|
||||
if addr not in root_actor.accept_addrs:
|
||||
|
@ -209,25 +211,17 @@ async def open_pikerd(
|
|||
'Maybe you have another daemon already running?'
|
||||
)
|
||||
|
||||
# assign globally for future daemon/task creation
|
||||
Services.actor_n = actor_nursery
|
||||
Services.service_n = service_nursery
|
||||
Services.debug_mode = debug_mode
|
||||
|
||||
try:
|
||||
yield Services
|
||||
|
||||
finally:
|
||||
# TODO: is this more clever/efficient?
|
||||
# if 'samplerd' in Services.service_tasks:
|
||||
# await Services.cancel_service('samplerd')
|
||||
service_nursery.cancel_scope.cancel()
|
||||
mngr: ServiceMngr
|
||||
async with open_service_mngr(
|
||||
debug_mode=debug_mode,
|
||||
) as mngr:
|
||||
yield mngr
|
||||
|
||||
|
||||
# TODO: do we even need this?
|
||||
# @acm
|
||||
# async def maybe_open_runtime(
|
||||
# loglevel: Optional[str] = None,
|
||||
# loglevel: str|None = None,
|
||||
# **kwargs,
|
||||
|
||||
# ) -> None:
|
||||
|
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
|
|||
loglevel: str | None = None,
|
||||
**kwargs,
|
||||
|
||||
) -> tractor._portal.Portal | ClassVar[Services]:
|
||||
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
|
||||
'''
|
||||
If no ``pikerd`` daemon-root-actor can be found start it and
|
||||
yield up (we should probably figure out returning a portal to self
|
||||
|
|
|
@ -49,7 +49,7 @@ from requests.exceptions import (
|
|||
ReadTimeout,
|
||||
)
|
||||
|
||||
from ._mngr import Services
|
||||
from ._mngr import ServiceMngr
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
get_console_log,
|
||||
|
@ -453,7 +453,7 @@ async def open_ahabd(
|
|||
|
||||
@acm
|
||||
async def start_ahab_service(
|
||||
services: Services,
|
||||
services: ServiceMngr,
|
||||
service_name: str,
|
||||
|
||||
# endpoint config passed as **kwargs
|
||||
|
@ -549,7 +549,8 @@ async def start_ahab_service(
|
|||
log.warning('Failed to cancel root permsed container')
|
||||
|
||||
except (
|
||||
trio.MultiError,
|
||||
# trio.MultiError,
|
||||
ExceptionGroup,
|
||||
) as err:
|
||||
for subexc in err.exceptions:
|
||||
if isinstance(subexc, PermissionError):
|
||||
|
|
|
@ -26,14 +26,17 @@ from typing import (
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from collections import defaultdict
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
)
|
||||
from ._mngr import (
|
||||
Services,
|
||||
get_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
from ._actor_runtime import maybe_open_pikerd
|
||||
from ._registry import find_service
|
||||
|
@ -41,15 +44,14 @@ from ._registry import find_service
|
|||
|
||||
@acm
|
||||
async def maybe_spawn_daemon(
|
||||
|
||||
service_name: str,
|
||||
service_task_target: Callable,
|
||||
|
||||
spawn_args: dict[str, Any],
|
||||
|
||||
loglevel: str | None = None,
|
||||
singleton: bool = False,
|
||||
|
||||
_locks = defaultdict(trio.Lock),
|
||||
**pikerd_kwargs,
|
||||
|
||||
) -> tractor.Portal:
|
||||
|
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
|
|||
'''
|
||||
# serialize access to this section to avoid
|
||||
# 2 or more tasks racing to create a daemon
|
||||
lock = Services.locks[service_name]
|
||||
lock = _locks[service_name]
|
||||
await lock.acquire()
|
||||
|
||||
async with find_service(
|
||||
|
@ -102,6 +104,12 @@ async def maybe_spawn_daemon(
|
|||
# service task for that actor.
|
||||
started: bool
|
||||
if pikerd_portal is None:
|
||||
|
||||
# await tractor.pause()
|
||||
if tractor_kwargs.get('debug_mode', False):
|
||||
from tractor.devx._debug import maybe_init_greenback
|
||||
await maybe_init_greenback()
|
||||
|
||||
started = await service_task_target(
|
||||
loglevel=loglevel,
|
||||
**spawn_args,
|
||||
|
@ -132,7 +140,65 @@ async def maybe_spawn_daemon(
|
|||
async with tractor.wait_for_actor(service_name) as portal:
|
||||
lock.release()
|
||||
yield portal
|
||||
await portal.cancel_actor()
|
||||
# --- ---- ---
|
||||
# XXX NOTE XXX
|
||||
# --- ---- ---
|
||||
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
|
||||
#
|
||||
# Doing so will cause an "out-of-band" ctxc
|
||||
# (`tractor.ContextCancelled`) to be raised inside the
|
||||
# `ServiceMngr.open_context_in_task()`'s call to
|
||||
# `ctx.wait_for_result()` AND the internal self-ctxc
|
||||
# "graceful capture" WILL NOT CATCH IT!
|
||||
#
|
||||
# This can cause certain types of operations to raise
|
||||
# that ctxc BEFORE THEY `return`, resulting in
|
||||
# a "false-negative" ctxc being raised when really
|
||||
# nothing actually failed, other then our semantic
|
||||
# "failure" to suppress an expected, graceful,
|
||||
# self-cancel scenario..
|
||||
#
|
||||
# bUt wHy duZ It WorK lIKe dis..
|
||||
# ------------------------------
|
||||
# from the perspective of the `tractor.Context` this
|
||||
# cancel request was conducted "out of band" since
|
||||
# `Context.cancel()` was never called and thus the
|
||||
# `._cancel_called: bool` was never set. Despite the
|
||||
# remote `.canceller` being set to `pikerd` (i.e. the
|
||||
# same `Actor.uid` of the raising service-mngr task) the
|
||||
# service-task's ctx itself was never marked as having
|
||||
# requested cancellation and thus still raises the ctxc
|
||||
# bc it was unaware of any such request.
|
||||
#
|
||||
# How to make grokin these cases easier tho?
|
||||
# ------------------------------------------
|
||||
# Because `Portal.cancel_actor()` was called it requests
|
||||
# "full-`Actor`-runtime-cancellation" of it's peer
|
||||
# process which IS NOT THE SAME as a single inter-actor
|
||||
# RPC task cancelling its local context with a remote
|
||||
# peer `Task` in that same peer process.
|
||||
#
|
||||
# ?TODO? It might be better if we do one (or all) of the
|
||||
# following:
|
||||
#
|
||||
# -[ ] at least set a special message for the
|
||||
# `ContextCancelled` when raised locally by the
|
||||
# unaware ctx task such that we check for the
|
||||
# `.canceller` being *our `Actor`* and in the case
|
||||
# where `Context._cancel_called == False` we specially
|
||||
# note that this is likely an "out-of-band"
|
||||
# runtime-cancel request triggered by some call to
|
||||
# `Portal.cancel_actor()`, possibly even reporting the
|
||||
# exact LOC of that caller by tracking it inside our
|
||||
# portal-type?
|
||||
# -[ ] possibly add another field `ContextCancelled` like
|
||||
# maybe a,
|
||||
# `.request_type: Literal['os', 'proc', 'actor',
|
||||
# 'ctx']` type thing which would allow immediately
|
||||
# being able to tell what kind of cancellation caused
|
||||
# the unexpected ctxc?
|
||||
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
|
||||
# better augment `tractor` to be more explicit on this!
|
||||
|
||||
|
||||
async def spawn_emsd(
|
||||
|
@ -147,26 +213,25 @@ async def spawn_emsd(
|
|||
"""
|
||||
log.info('Spawning emsd')
|
||||
|
||||
portal = await Services.actor_n.start_actor(
|
||||
smngr: ServiceMngr = get_service_mngr()
|
||||
portal = await smngr.an.start_actor(
|
||||
'emsd',
|
||||
enable_modules=[
|
||||
'piker.clearing._ems',
|
||||
'piker.clearing._client',
|
||||
],
|
||||
loglevel=loglevel,
|
||||
debug_mode=Services.debug_mode, # set by pikerd flag
|
||||
debug_mode=smngr.debug_mode, # set by pikerd flag
|
||||
**extra_tractor_kwargs
|
||||
)
|
||||
|
||||
# non-blocking setup of clearing service
|
||||
from ..clearing._ems import _setup_persistent_emsd
|
||||
|
||||
await Services.start_service_task(
|
||||
'emsd',
|
||||
portal,
|
||||
|
||||
# signature of target root-task endpoint
|
||||
_setup_persistent_emsd,
|
||||
await smngr.start_service_ctx(
|
||||
name='emsd',
|
||||
portal=portal,
|
||||
ctx_fn=_setup_persistent_emsd,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
return True
|
||||
|
|
|
@ -18,148 +18,36 @@
|
|||
daemon-service management API.
|
||||
|
||||
"""
|
||||
from collections import defaultdict
|
||||
from typing import (
|
||||
Callable,
|
||||
Any,
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import (
|
||||
current_actor,
|
||||
ContextCancelled,
|
||||
Context,
|
||||
Portal,
|
||||
from tractor.hilevel import (
|
||||
ServiceMngr,
|
||||
# open_service_mngr as _open_service_mngr,
|
||||
get_service_mngr as get_service_mngr,
|
||||
)
|
||||
# TODO:
|
||||
# -[ ] factor all the common shit from `.data._sampling`
|
||||
# and `.brokers._daemon` into here / `ServiceMngr`
|
||||
# in terms of allocating the `Portal` as part of the
|
||||
# "service-in-subactor" starting!
|
||||
# -[ ] move to `tractor.hilevel._service`, import and use here!
|
||||
# NOTE: purposely leaks the ref to the mod-scope Bo
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
)
|
||||
Services: ServiceMngr|None = None
|
||||
|
||||
@acm
|
||||
async def open_service_mngr(
|
||||
**kwargs,
|
||||
) -> ServiceMngr:
|
||||
|
||||
# TODO: we need remote wrapping and a general soln:
|
||||
# - factor this into a ``tractor.highlevel`` extension # pack for the
|
||||
# library.
|
||||
# - wrap a "remote api" wherein you can get a method proxy
|
||||
# to the pikerd actor for starting services remotely!
|
||||
# - prolly rename this to ActorServicesNursery since it spawns
|
||||
# new actors and supervises them to completion?
|
||||
class Services:
|
||||
|
||||
actor_n: tractor._supervise.ActorNursery
|
||||
service_n: trio.Nursery
|
||||
debug_mode: bool # tractor sub-actor debug mode flag
|
||||
service_tasks: dict[
|
||||
str,
|
||||
tuple[
|
||||
trio.CancelScope,
|
||||
Portal,
|
||||
trio.Event,
|
||||
]
|
||||
] = {}
|
||||
locks = defaultdict(trio.Lock)
|
||||
|
||||
@classmethod
|
||||
async def start_service_task(
|
||||
self,
|
||||
name: str,
|
||||
portal: Portal,
|
||||
target: Callable,
|
||||
allow_overruns: bool = False,
|
||||
**ctx_kwargs,
|
||||
|
||||
) -> (trio.CancelScope, Context):
|
||||
'''
|
||||
Open a context in a service sub-actor, add to a stack
|
||||
that gets unwound at ``pikerd`` teardown.
|
||||
|
||||
This allows for allocating long-running sub-services in our main
|
||||
daemon and explicitly controlling their lifetimes.
|
||||
|
||||
'''
|
||||
async def open_context_in_task(
|
||||
task_status: TaskStatus[
|
||||
tuple[
|
||||
trio.CancelScope,
|
||||
trio.Event,
|
||||
Any,
|
||||
]
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> Any:
|
||||
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
async with portal.open_context(
|
||||
target,
|
||||
allow_overruns=allow_overruns,
|
||||
**ctx_kwargs,
|
||||
|
||||
) as (ctx, first):
|
||||
|
||||
# unblock once the remote context has started
|
||||
complete = trio.Event()
|
||||
task_status.started((cs, complete, first))
|
||||
log.info(
|
||||
f'`pikerd` service {name} started with value {first}'
|
||||
)
|
||||
try:
|
||||
# wait on any context's return value
|
||||
# and any final portal result from the
|
||||
# sub-actor.
|
||||
ctx_res: Any = await ctx.result()
|
||||
|
||||
# NOTE: blocks indefinitely until cancelled
|
||||
# either by error from the target context
|
||||
# function or by being cancelled here by the
|
||||
# surrounding cancel scope.
|
||||
return (await portal.result(), ctx_res)
|
||||
except ContextCancelled as ctxe:
|
||||
canceller: tuple[str, str] = ctxe.canceller
|
||||
our_uid: tuple[str, str] = current_actor().uid
|
||||
if (
|
||||
canceller != portal.channel.uid
|
||||
and
|
||||
canceller != our_uid
|
||||
):
|
||||
log.cancel(
|
||||
f'Actor-service {name} was remotely cancelled?\n'
|
||||
f'remote canceller: {canceller}\n'
|
||||
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
|
||||
finally:
|
||||
await portal.cancel_actor()
|
||||
complete.set()
|
||||
self.service_tasks.pop(name)
|
||||
|
||||
cs, complete, first = await self.service_n.start(open_context_in_task)
|
||||
|
||||
# store the cancel scope and portal for later cancellation or
|
||||
# retstart if needed.
|
||||
self.service_tasks[name] = (cs, portal, complete)
|
||||
|
||||
return cs, first
|
||||
|
||||
@classmethod
|
||||
async def cancel_service(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> Any:
|
||||
'''
|
||||
Cancel the service task and actor for the given ``name``.
|
||||
|
||||
'''
|
||||
log.info(f'Cancelling `pikerd` service {name}')
|
||||
cs, portal, complete = self.service_tasks[name]
|
||||
cs.cancel()
|
||||
await complete.wait()
|
||||
assert name not in self.service_tasks, \
|
||||
f'Serice task for {name} not terminated?'
|
||||
global Services
|
||||
async with tractor.hilevel.open_service_mngr(
|
||||
**kwargs,
|
||||
) as mngr:
|
||||
# Services = proxy(mngr)
|
||||
Services = mngr
|
||||
yield mngr
|
||||
Services = None
|
||||
|
|
|
@ -21,11 +21,13 @@ from typing import (
|
|||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
# TODO: oof, needs to be changed to `httpx`!
|
||||
import asks
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import docker
|
||||
from ._ahab import DockerContainer
|
||||
from . import ServiceMngr
|
||||
|
||||
from ._util import log # sub-sys logger
|
||||
from ._util import (
|
||||
|
@ -127,7 +129,7 @@ def start_elasticsearch(
|
|||
|
||||
@acm
|
||||
async def start_ahab_daemon(
|
||||
service_mngr: Services,
|
||||
service_mngr: ServiceMngr,
|
||||
user_config: dict | None = None,
|
||||
loglevel: str | None = None,
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ import pendulum
|
|||
# import purerpc
|
||||
|
||||
from ..data.feed import maybe_open_feed
|
||||
from . import Services
|
||||
from . import ServiceMngr
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
get_console_log,
|
||||
|
@ -233,7 +233,7 @@ def start_marketstore(
|
|||
|
||||
@acm
|
||||
async def start_ahab_daemon(
|
||||
service_mngr: Services,
|
||||
service_mngr: ServiceMngr,
|
||||
user_config: dict | None = None,
|
||||
loglevel: str | None = None,
|
||||
|
||||
|
|
|
@ -386,6 +386,8 @@ def ldshm(
|
|||
open_annot_ctl() as actl,
|
||||
):
|
||||
shm_df: pl.DataFrame | None = None
|
||||
tf2aids: dict[float, dict] = {}
|
||||
|
||||
for (
|
||||
shmfile,
|
||||
shm,
|
||||
|
@ -526,16 +528,17 @@ def ldshm(
|
|||
new_df,
|
||||
step_gaps,
|
||||
)
|
||||
|
||||
# last chance manual overwrites in REPL
|
||||
await tractor.pause()
|
||||
# await tractor.pause()
|
||||
assert aids
|
||||
tf2aids[period_s] = aids
|
||||
|
||||
else:
|
||||
# allow interaction even when no ts problems.
|
||||
await tractor.pause()
|
||||
# assert not diff
|
||||
assert not diff
|
||||
|
||||
await tractor.pause()
|
||||
log.info('Exiting TSP shm anal-izer!')
|
||||
|
||||
if shm_df is None:
|
||||
log.error(
|
||||
|
|
|
@ -161,7 +161,13 @@ class NativeStorageClient:
|
|||
|
||||
def index_files(self):
|
||||
for path in self._datadir.iterdir():
|
||||
if path.name in {'borked', 'expired',}:
|
||||
if (
|
||||
path.is_dir()
|
||||
or
|
||||
'.parquet' not in str(path)
|
||||
# or
|
||||
# path.name in {'borked', 'expired',}
|
||||
):
|
||||
continue
|
||||
|
||||
key: str = path.name.rstrip('.parquet')
|
||||
|
|
|
@ -44,8 +44,10 @@ import trio
|
|||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from pendulum import (
|
||||
Interval,
|
||||
DateTime,
|
||||
Duration,
|
||||
duration as mk_duration,
|
||||
from_timestamp,
|
||||
)
|
||||
import numpy as np
|
||||
|
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
|
|||
# pair, immediately stop backfilling?
|
||||
if (
|
||||
start_dt
|
||||
and end_dt < start_dt
|
||||
and
|
||||
end_dt < start_dt
|
||||
):
|
||||
await tractor.pause()
|
||||
break
|
||||
|
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
|
|||
except tractor.ContextCancelled:
|
||||
# log.exception
|
||||
await tractor.pause()
|
||||
raise
|
||||
|
||||
null_segs_detected.set()
|
||||
# RECHECK for more null-gaps
|
||||
|
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
|
|||
|
||||
async def start_backfill(
|
||||
get_hist,
|
||||
frame_types: dict[str, Duration] | None,
|
||||
def_frame_duration: Duration,
|
||||
mod: ModuleType,
|
||||
mkt: MktPair,
|
||||
shm: ShmArray,
|
||||
|
@ -379,22 +383,23 @@ async def start_backfill(
|
|||
update_start_on_prepend: bool = False
|
||||
if backfill_until_dt is None:
|
||||
|
||||
# TODO: drop this right and just expose the backfill
|
||||
# limits inside a [storage] section in conf.toml?
|
||||
# when no tsdb "last datum" is provided, we just load
|
||||
# some near-term history.
|
||||
# periods = {
|
||||
# 1: {'days': 1},
|
||||
# 60: {'days': 14},
|
||||
# }
|
||||
|
||||
# do a decently sized backfill and load it into storage.
|
||||
# TODO: per-provider default history-durations?
|
||||
# -[ ] inside the `open_history_client()` config allow
|
||||
# declaring the history duration limits instead of
|
||||
# guessing and/or applying the same limits to all?
|
||||
#
|
||||
# -[ ] allow declaring (default) per-provider backfill
|
||||
# limits inside a [storage] sub-section in conf.toml?
|
||||
#
|
||||
# NOTE, when no tsdb "last datum" is provided, we just
|
||||
# load some near-term history by presuming a "decently
|
||||
# large" 60s duration limit and a much shorter 1s range.
|
||||
periods = {
|
||||
1: {'days': 2},
|
||||
60: {'years': 6},
|
||||
}
|
||||
period_duration: int = periods[timeframe]
|
||||
update_start_on_prepend = True
|
||||
update_start_on_prepend: bool = True
|
||||
|
||||
# NOTE: manually set the "latest" datetime which we intend to
|
||||
# backfill history "until" so as to adhere to the history
|
||||
|
@ -416,7 +421,6 @@ async def start_backfill(
|
|||
f'backfill_until_dt: {backfill_until_dt}\n'
|
||||
f'last_start_dt: {last_start_dt}\n'
|
||||
)
|
||||
|
||||
try:
|
||||
(
|
||||
array,
|
||||
|
@ -426,71 +430,114 @@ async def start_backfill(
|
|||
timeframe,
|
||||
end_dt=last_start_dt,
|
||||
)
|
||||
|
||||
except NoData as _daterr:
|
||||
# 3 cases:
|
||||
# - frame in the middle of a legit venue gap
|
||||
# - history actually began at the `last_start_dt`
|
||||
# - some other unknown error (ib blocking the
|
||||
# history bc they don't want you seeing how they
|
||||
# cucked all the tinas..)
|
||||
if dur := frame_types.get(timeframe):
|
||||
# decrement by a frame's worth of duration and
|
||||
# retry a few times.
|
||||
last_start_dt.subtract(
|
||||
seconds=dur.total_seconds()
|
||||
orig_last_start_dt: datetime = last_start_dt
|
||||
gap_report: str = (
|
||||
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
|
||||
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||
f'last_start_dt: {orig_last_start_dt}\n\n'
|
||||
f'bf_until: {backfill_until_dt}\n'
|
||||
)
|
||||
# EMPTY FRAME signal with 3 (likely) causes:
|
||||
#
|
||||
# 1. range contains legit gap in venue history
|
||||
# 2. history actually (edge case) **began** at the
|
||||
# value `last_start_dt`
|
||||
# 3. some other unknown error (ib blocking the
|
||||
# history-query bc they don't want you seeing how
|
||||
# they cucked all the tinas.. like with options
|
||||
# hist)
|
||||
#
|
||||
if def_frame_duration:
|
||||
# decrement by a duration's (frame) worth of time
|
||||
# as maybe indicated by the backend to see if we
|
||||
# can get older data before this possible
|
||||
# "history gap".
|
||||
last_start_dt: datetime = last_start_dt.subtract(
|
||||
seconds=def_frame_duration.total_seconds()
|
||||
)
|
||||
log.warning(
|
||||
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
|
||||
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||
'bf_until <- last_start_dt:\n'
|
||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||
f'Decrementing `end_dt` by {dur} and retry..\n'
|
||||
gap_report += (
|
||||
f'Decrementing `end_dt` and retrying with,\n'
|
||||
f'def_frame_duration: {def_frame_duration}\n'
|
||||
f'(new) last_start_dt: {last_start_dt}\n'
|
||||
)
|
||||
log.warning(gap_report)
|
||||
# skip writing to shm/tsdb and try the next
|
||||
# duration's worth of prior history.
|
||||
continue
|
||||
|
||||
# broker says there never was or is no more history to pull
|
||||
except DataUnavailable:
|
||||
log.warning(
|
||||
f'NO-MORE-DATA in range?\n'
|
||||
f'`{mod.name}` halted history:\n'
|
||||
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||
'bf_until <- last_start_dt:\n'
|
||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||
)
|
||||
else:
|
||||
# await tractor.pause()
|
||||
raise DataUnavailable(gap_report)
|
||||
|
||||
# ugh, what's a better way?
|
||||
# TODO: fwiw, we probably want a way to signal a throttle
|
||||
# condition (eg. with ib) so that we can halt the
|
||||
# request loop until the condition is resolved?
|
||||
if timeframe > 1:
|
||||
await tractor.pause()
|
||||
# broker says there never was or is no more history to pull
|
||||
except DataUnavailable as due:
|
||||
message: str = due.args[0]
|
||||
log.warning(
|
||||
f'Provider {mod.name!r} halted backfill due to,\n\n'
|
||||
|
||||
f'{message}\n'
|
||||
|
||||
f'fqme: {mkt.fqme}\n'
|
||||
f'timeframe: {timeframe}\n'
|
||||
f'last_start_dt: {last_start_dt}\n'
|
||||
f'bf_until: {backfill_until_dt}\n'
|
||||
)
|
||||
# UGH: what's a better way?
|
||||
# TODO: backends are responsible for being correct on
|
||||
# this right!?
|
||||
# -[ ] in the `ib` case we could maybe offer some way
|
||||
# to halt the request loop until the condition is
|
||||
# resolved or should the backend be entirely in
|
||||
# charge of solving such faults? yes, right?
|
||||
return
|
||||
|
||||
time: np.ndarray = array['time']
|
||||
assert (
|
||||
array['time'][0]
|
||||
time[0]
|
||||
==
|
||||
next_start_dt.timestamp()
|
||||
)
|
||||
|
||||
diff = last_start_dt - next_start_dt
|
||||
frame_time_diff_s = diff.seconds
|
||||
assert time[-1] == next_end_dt.timestamp()
|
||||
|
||||
expected_dur: Interval = last_start_dt - next_start_dt
|
||||
|
||||
# frame's worth of sample-period-steps, in seconds
|
||||
frame_size_s: float = len(array) * timeframe
|
||||
expected_frame_size_s: float = frame_size_s + timeframe
|
||||
if frame_time_diff_s > expected_frame_size_s:
|
||||
|
||||
recv_frame_dur: Duration = (
|
||||
from_timestamp(array[-1]['time'])
|
||||
-
|
||||
from_timestamp(array[0]['time'])
|
||||
)
|
||||
if (
|
||||
(lt_frame := (recv_frame_dur < expected_dur))
|
||||
or
|
||||
(null_frame := (frame_size_s == 0))
|
||||
# ^XXX, should NEVER hit now!
|
||||
):
|
||||
# XXX: query result includes a start point prior to our
|
||||
# expected "frame size" and thus is likely some kind of
|
||||
# history gap (eg. market closed period, outage, etc.)
|
||||
# so just report it to console for now.
|
||||
if lt_frame:
|
||||
reason = 'Possible GAP (or first-datum)'
|
||||
else:
|
||||
assert null_frame
|
||||
reason = 'NULL-FRAME'
|
||||
|
||||
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
|
||||
log.warning(
|
||||
'GAP DETECTED:\n'
|
||||
f'last_start_dt: {last_start_dt}\n'
|
||||
f'diff: {diff}\n'
|
||||
f'frame_time_diff_s: {frame_time_diff_s}\n'
|
||||
f'{timeframe}s-series {reason} detected!\n'
|
||||
f'fqme: {mkt.fqme}\n'
|
||||
f'last_start_dt: {last_start_dt}\n\n'
|
||||
f'recv interval: {recv_frame_dur}\n'
|
||||
f'expected interval: {expected_dur}\n\n'
|
||||
|
||||
f'Missing duration of history of {missing_dur.in_words()!r}\n'
|
||||
f'{missing_dur}\n'
|
||||
)
|
||||
# await tractor.pause()
|
||||
|
||||
to_push = diff_history(
|
||||
array,
|
||||
|
@ -565,22 +612,27 @@ async def start_backfill(
|
|||
# long-term storage.
|
||||
if (
|
||||
storage is not None
|
||||
and write_tsdb
|
||||
and
|
||||
write_tsdb
|
||||
):
|
||||
log.info(
|
||||
f'Writing {ln} frame to storage:\n'
|
||||
f'{next_start_dt} -> {last_start_dt}'
|
||||
)
|
||||
|
||||
# always drop the src asset token for
|
||||
# NOTE, always drop the src asset token for
|
||||
# non-currency-pair like market types (for now)
|
||||
#
|
||||
# THAT IS, for now our table key schema is NOT
|
||||
# including the dst[/src] source asset token. SO,
|
||||
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
|
||||
# historical reasons ONLY.
|
||||
if mkt.dst.atype not in {
|
||||
'crypto',
|
||||
'crypto_currency',
|
||||
'fiat', # a "forex pair"
|
||||
'perpetual_future', # stupid "perps" from cex land
|
||||
}:
|
||||
# for now, our table key schema is not including
|
||||
# the dst[/src] source asset token.
|
||||
col_sym_key: str = mkt.get_fqme(
|
||||
delim_char='',
|
||||
without_src=True,
|
||||
|
@ -685,7 +737,7 @@ async def back_load_from_tsdb(
|
|||
last_tsdb_dt
|
||||
and latest_start_dt
|
||||
):
|
||||
backfilled_size_s = (
|
||||
backfilled_size_s: Duration = (
|
||||
latest_start_dt - last_tsdb_dt
|
||||
).seconds
|
||||
# if the shm buffer len is not large enough to contain
|
||||
|
@ -908,6 +960,8 @@ async def tsdb_backfill(
|
|||
f'{pformat(config)}\n'
|
||||
)
|
||||
|
||||
# concurrently load the provider's most-recent-frame AND any
|
||||
# pre-existing tsdb history already saved in `piker` storage.
|
||||
dt_eps: list[DateTime, DateTime] = []
|
||||
async with trio.open_nursery() as tn:
|
||||
tn.start_soon(
|
||||
|
@ -918,7 +972,6 @@ async def tsdb_backfill(
|
|||
timeframe,
|
||||
config,
|
||||
)
|
||||
|
||||
tsdb_entry: tuple = await load_tsdb_hist(
|
||||
storage,
|
||||
mkt,
|
||||
|
@ -947,6 +1000,25 @@ async def tsdb_backfill(
|
|||
mr_end_dt,
|
||||
) = dt_eps
|
||||
|
||||
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
|
||||
calced_frame_size: Duration = mk_duration(
|
||||
seconds=first_frame_dur_s,
|
||||
)
|
||||
# NOTE, attempt to use the backend declared default frame
|
||||
# sizing (as allowed by their time-series query APIs) and
|
||||
# if not provided try to construct a default from the
|
||||
# first frame received above.
|
||||
def_frame_durs: dict[
|
||||
int,
|
||||
Duration,
|
||||
]|None = config.get('frame_types', None)
|
||||
if def_frame_durs:
|
||||
def_frame_size: Duration = def_frame_durs[timeframe]
|
||||
assert def_frame_size == calced_frame_size
|
||||
else:
|
||||
# use what we calced from first frame above.
|
||||
def_frame_size = calced_frame_size
|
||||
|
||||
# NOTE: when there's no offline data, there's 2 cases:
|
||||
# - data backend doesn't support timeframe/sample
|
||||
# period (in which case `dt_eps` should be `None` and
|
||||
|
@ -977,7 +1049,7 @@ async def tsdb_backfill(
|
|||
partial(
|
||||
start_backfill,
|
||||
get_hist=get_hist,
|
||||
frame_types=config.get('frame_types', None),
|
||||
def_frame_duration=def_frame_size,
|
||||
mod=mod,
|
||||
mkt=mkt,
|
||||
shm=shm,
|
||||
|
|
|
@ -616,6 +616,18 @@ def detect_price_gaps(
|
|||
# ])
|
||||
...
|
||||
|
||||
# TODO: probably just use the null_segs impl above?
|
||||
def detect_vlm_gaps(
|
||||
df: pl.DataFrame,
|
||||
col: str = 'volume',
|
||||
|
||||
) -> pl.DataFrame:
|
||||
|
||||
vnull: pl.DataFrame = w_dts.filter(
|
||||
pl.col(col) == 0
|
||||
)
|
||||
return vnull
|
||||
|
||||
|
||||
def dedupe(
|
||||
src_df: pl.DataFrame,
|
||||
|
@ -626,7 +638,6 @@ def dedupe(
|
|||
|
||||
) -> tuple[
|
||||
pl.DataFrame, # with dts
|
||||
pl.DataFrame, # gaps
|
||||
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
||||
int, # len diff between input and deduped
|
||||
]:
|
||||
|
@ -639,19 +650,22 @@ def dedupe(
|
|||
'''
|
||||
wdts: pl.DataFrame = with_dts(src_df)
|
||||
|
||||
# maybe sort on any time field
|
||||
if sort:
|
||||
wdts = wdts.sort(by='time')
|
||||
# TODO: detect out-of-order segments which were corrected!
|
||||
# -[ ] report in log msg
|
||||
# -[ ] possibly return segment sections which were moved?
|
||||
deduped = wdts
|
||||
|
||||
# remove duplicated datetime samples/sections
|
||||
deduped: pl.DataFrame = wdts.unique(
|
||||
subset=['dt'],
|
||||
# subset=['dt'],
|
||||
subset=['time'],
|
||||
maintain_order=True,
|
||||
)
|
||||
|
||||
# maybe sort on any time field
|
||||
if sort:
|
||||
deduped = deduped.sort(by='time')
|
||||
# TODO: detect out-of-order segments which were corrected!
|
||||
# -[ ] report in log msg
|
||||
# -[ ] possibly return segment sections which were moved?
|
||||
|
||||
diff: int = (
|
||||
wdts.height
|
||||
-
|
||||
|
|
228
piker/types.py
228
piker/types.py
|
@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
|
|||
types.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from collections import UserList
|
||||
from pprint import (
|
||||
saferepr,
|
||||
)
|
||||
from typing import Any
|
||||
|
||||
from msgspec import (
|
||||
msgpack,
|
||||
Struct as _Struct,
|
||||
structs,
|
||||
)
|
||||
|
||||
|
||||
class DiffDump(UserList):
|
||||
'''
|
||||
Very simple list delegator that repr() dumps (presumed) tuple
|
||||
elements of the form `tuple[str, Any, Any]` in a nice
|
||||
multi-line readable form for analyzing `Struct` diffs.
|
||||
|
||||
'''
|
||||
def __repr__(self) -> str:
|
||||
if not len(self):
|
||||
return super().__repr__()
|
||||
|
||||
# format by displaying item pair's ``repr()`` on multiple,
|
||||
# indented lines such that they are more easily visually
|
||||
# comparable when printed to console when printed to
|
||||
# console.
|
||||
repstr: str = '[\n'
|
||||
for k, left, right in self:
|
||||
repstr += (
|
||||
f'({k},\n'
|
||||
f'\t{repr(left)},\n'
|
||||
f'\t{repr(right)},\n'
|
||||
')\n'
|
||||
)
|
||||
repstr += ']\n'
|
||||
return repstr
|
||||
|
||||
|
||||
class Struct(
|
||||
_Struct,
|
||||
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
# tag='pikerstruct',
|
||||
# tag=True,
|
||||
):
|
||||
'''
|
||||
A "human friendlier" (aka repl buddy) struct subtype.
|
||||
|
||||
'''
|
||||
def _sin_props(self) -> Iterator[
|
||||
tuple[
|
||||
structs.FieldIinfo,
|
||||
str,
|
||||
Any,
|
||||
]
|
||||
]:
|
||||
'''
|
||||
Iterate over all non-@property fields of this struct.
|
||||
|
||||
'''
|
||||
fi: structs.FieldInfo
|
||||
for fi in structs.fields(self):
|
||||
key: str = fi.name
|
||||
val: Any = getattr(self, key)
|
||||
yield fi, key, val
|
||||
|
||||
def to_dict(
|
||||
self,
|
||||
include_non_members: bool = True,
|
||||
|
||||
) -> dict:
|
||||
'''
|
||||
Like it sounds.. direct delegation to:
|
||||
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
|
||||
|
||||
BUT, by default we pop all non-member (aka not defined as
|
||||
struct fields) fields by default.
|
||||
|
||||
'''
|
||||
asdict: dict = structs.asdict(self)
|
||||
if include_non_members:
|
||||
return asdict
|
||||
|
||||
# only return a dict of the struct members
|
||||
# which were provided as input, NOT anything
|
||||
# added as type-defined `@property` methods!
|
||||
sin_props: dict = {}
|
||||
fi: structs.FieldInfo
|
||||
for fi, k, v in self._sin_props():
|
||||
sin_props[k] = asdict[k]
|
||||
|
||||
return sin_props
|
||||
|
||||
def pformat(
|
||||
self,
|
||||
field_indent: int = 2,
|
||||
indent: int = 0,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Recursion-safe `pprint.pformat()` style formatting of
|
||||
a `msgspec.Struct` for sane reading by a human using a REPL.
|
||||
|
||||
'''
|
||||
# global whitespace indent
|
||||
ws: str = ' '*indent
|
||||
|
||||
# field whitespace indent
|
||||
field_ws: str = ' '*(field_indent + indent)
|
||||
|
||||
# qtn: str = ws + self.__class__.__qualname__
|
||||
qtn: str = self.__class__.__qualname__
|
||||
|
||||
obj_str: str = '' # accumulator
|
||||
fi: structs.FieldInfo
|
||||
k: str
|
||||
v: Any
|
||||
for fi, k, v in self._sin_props():
|
||||
|
||||
# TODO: how can we prefer `Literal['option1', 'option2,
|
||||
# ..]` over .__name__ == `Literal` but still get only the
|
||||
# latter for simple types like `str | int | None` etc..?
|
||||
ft: type = fi.type
|
||||
typ_name: str = getattr(ft, '__name__', str(ft))
|
||||
|
||||
# recurse to get sub-struct's `.pformat()` output Bo
|
||||
if isinstance(v, Struct):
|
||||
val_str: str = v.pformat(
|
||||
indent=field_indent + indent,
|
||||
field_indent=indent + field_indent,
|
||||
)
|
||||
|
||||
else: # the `pprint` recursion-safe format:
|
||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||
val_str: str = saferepr(v)
|
||||
|
||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||
|
||||
return (
|
||||
f'{qtn}(\n'
|
||||
f'{obj_str}'
|
||||
f'{ws})'
|
||||
)
|
||||
|
||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||
# inside a known tty?
|
||||
# def __repr__(self) -> str:
|
||||
# ...
|
||||
|
||||
# __str__ = __repr__ = pformat
|
||||
__repr__ = pformat
|
||||
|
||||
def copy(
|
||||
self,
|
||||
update: dict | None = None,
|
||||
|
||||
) -> Struct:
|
||||
'''
|
||||
Validate-typecast all self defined fields, return a copy of
|
||||
us with all such fields.
|
||||
|
||||
NOTE: This is kinda like the default behaviour in
|
||||
`pydantic.BaseModel` except a copy of the object is
|
||||
returned making it compat with `frozen=True`.
|
||||
|
||||
'''
|
||||
if update:
|
||||
for k, v in update.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
# NOTE: roundtrip serialize to validate
|
||||
# - enode to msgpack binary format,
|
||||
# - decode that back to a struct.
|
||||
return msgpack.Decoder(type=type(self)).decode(
|
||||
msgpack.Encoder().encode(self)
|
||||
)
|
||||
|
||||
def typecast(
|
||||
self,
|
||||
|
||||
# TODO: allow only casting a named subset?
|
||||
# fields: set[str] | None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Cast all fields using their declared type annotations
|
||||
(kinda like what `pydantic` does by default).
|
||||
|
||||
NOTE: this of course won't work on frozen types, use
|
||||
``.copy()`` above in such cases.
|
||||
|
||||
'''
|
||||
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
|
||||
fi: structs.FieldInfo
|
||||
for fi in structs.fields(self):
|
||||
setattr(
|
||||
self,
|
||||
fi.name,
|
||||
fi.type(getattr(self, fi.name)),
|
||||
)
|
||||
|
||||
def __sub__(
|
||||
self,
|
||||
other: Struct,
|
||||
|
||||
) -> DiffDump[tuple[str, Any, Any]]:
|
||||
'''
|
||||
Compare fields/items key-wise and return a ``DiffDump``
|
||||
for easy visual REPL comparison B)
|
||||
|
||||
'''
|
||||
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
|
||||
for fi in structs.fields(self):
|
||||
attr_name: str = fi.name
|
||||
ours: Any = getattr(self, attr_name)
|
||||
theirs: Any = getattr(other, attr_name)
|
||||
if ours != theirs:
|
||||
diffs.append((
|
||||
attr_name,
|
||||
ours,
|
||||
theirs,
|
||||
))
|
||||
|
||||
return diffs
|
||||
from tractor.msg import Struct as Struct
|
||||
|
|
|
@ -10,7 +10,7 @@ from piker import (
|
|||
config,
|
||||
)
|
||||
from piker.service import (
|
||||
Services,
|
||||
get_service_mngr,
|
||||
)
|
||||
from piker.log import get_console_log
|
||||
|
||||
|
@ -129,7 +129,7 @@ async def _open_test_pikerd(
|
|||
) as service_manager,
|
||||
):
|
||||
# this proc/actor is the pikerd
|
||||
assert service_manager is Services
|
||||
assert service_manager is get_service_mngr()
|
||||
|
||||
async with tractor.wait_for_actor(
|
||||
'pikerd',
|
||||
|
|
|
@ -26,7 +26,7 @@ import pytest
|
|||
import tractor
|
||||
from uuid import uuid4
|
||||
|
||||
from piker.service import Services
|
||||
from piker.service import ServiceMngr
|
||||
from piker.log import get_logger
|
||||
from piker.clearing._messages import (
|
||||
Order,
|
||||
|
@ -158,7 +158,7 @@ def load_and_check_pos(
|
|||
|
||||
|
||||
def test_ems_err_on_bad_broker(
|
||||
open_test_pikerd: Services,
|
||||
open_test_pikerd: ServiceMngr,
|
||||
loglevel: str,
|
||||
):
|
||||
async def load_bad_fqme():
|
||||
|
|
|
@ -15,7 +15,7 @@ import tractor
|
|||
|
||||
from piker.service import (
|
||||
find_service,
|
||||
Services,
|
||||
ServiceMngr,
|
||||
)
|
||||
from piker.data import (
|
||||
open_feed,
|
||||
|
@ -44,7 +44,7 @@ def test_runtime_boot(
|
|||
async def main():
|
||||
port = 6666
|
||||
daemon_addr = ('127.0.0.1', port)
|
||||
services: Services
|
||||
services: ServiceMngr
|
||||
|
||||
async with (
|
||||
open_test_pikerd(
|
||||
|
|
Loading…
Reference in New Issue