Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet 01cbd0a775 Support `tractor.pause_from_sync()` in `brokerd`s
By passing down the `tractor.hilevel.ServiceMngr.debug_mode: bool`
(normally proxied in from the `--pdb` CLI flag) to `spawn_brokerd()` and
adjusting the `_setup_persistent_brokerd()` endpoint to do the
`tractor.devx._debug.maybe_init_greenback()` if needed.

Also in the `broker_init()` factory merge all `tractor` related `kwargs`
(i.e. `start_actor_kwargs | datad_kwargs | spawn_kws`) into the 2nd
element returned as to be passed to `ActorNursery.start_actor()`. Start
re-naming some internal vars/fields as `datad` as well.
2025-02-19 17:54:40 -05:00
Tyler Goodlet 8ab2feba3e Type adjust to `tractor.hilevel.ServicecMngr` 2025-02-19 17:54:40 -05:00
Tyler Goodlet 91d7db9db8 Official service-mngr to `tractor.hilevel` move
Such that we maintain that subsys in the actor-runtime repo (with
hopefully an extensive test suite XD).

Port deats,
- rewrite `open_service_mngr()` as a thin wrapper that delegates into
  the new `tractor.hilevel.open_service_mngr()` but with maintenance of
  the `Services` class-singleton for now.
- port `.service._daemon` usage to the new
  `ServiceMngr.start_service_ctx()` a rename from
  `.start_service_task()` which is now likely destined for the soon
  supported `tractor.trionics.TaskMngr` nursery extension.
- ref the new `ServiceMngr.an: ActorNursery` instance var name.

Other,
- always enable the `tractor.pause_from_sync()` support via `greenback`
  whenever `debug_mode` is set at `pikerd` init.
2025-02-19 17:54:40 -05:00
Nelson Torres 7f38e86b16 Updated tractor method name. 2025-02-19 17:54:40 -05:00
Tyler Goodlet 621a8b829c More service-mngr clarity notes
Nothing changing functionally here just adding more `tractor`
operational notes, tips for debug tooling and typing fixes B)

Of particular note is adding further details about the reason we do not
need to call `Context.cancel()` inside the `finally:` block of
`.open_context_in_task()` thanks to `tractor`'s new and improved
inter-actor cancellation semantics Bo
2025-02-19 17:54:40 -05:00
Tyler Goodlet 903c739b06 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-02-19 17:54:40 -05:00
Tyler Goodlet badcc60d60 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-02-19 17:54:40 -05:00
Tyler Goodlet 4c9915a4a5 Enable `greenback` for `.pause_from_sync()` by default? 2025-02-19 17:54:40 -05:00
Tyler Goodlet 636a580645 Delegate to `tractor.msg.pretty_struct` since it was factored from here! 2025-02-19 17:54:40 -05:00
Tyler Goodlet 389c746223 Catch using `Sampler.bcast_errors` where possible
In all other possible IPC disconnect handling blocks. Also more
comprehensive typing throughout `uniform_rate_send()`.
2025-02-19 17:10:54 -05:00
Tyler Goodlet a795108041 Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed
externally as needed for catching other similar pub-sub/IPC failures in
other (related) real-time sub-systems.

Also added some now-masked logging for debugging live-feed stream reading
issues that should ONLY be used for debugging since they'll greatly
degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we
should prolly pull from `modden` as a dep) for pretty console emissions.
2025-02-19 17:10:54 -05:00
Tyler Goodlet 6e495511ff Suppress `trio.EndOfChannel`s raised by remote peer
Since now `tractor` will raise this native `trio`-exc translated from
a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`.
Just `info()` log in such cases versus continuing to warn for the
others.
2025-02-19 17:10:54 -05:00
Tyler Goodlet e391c896f8 Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2025-02-19 17:05:13 -05:00
Tyler Goodlet 5633f5614d Doc-n-clean `.data._web_bs.open_jsonrpc_session()`
Add a doc-string reflecting recent refinements, drop all the old hook
params, rename `n: trio.Nursery` -> `tn` for "task nursery" fitting with
code base's naming style.
2025-02-19 17:05:13 -05:00
Tyler Goodlet 76735189de data._web_bs: try to raise jsonrpc errors in parent task 2025-02-19 17:05:13 -05:00
Tyler Goodlet d49608f74e Refine history gap/termination signalling
Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
2025-02-19 17:01:24 -05:00
Tyler Goodlet bf0ac93aa3 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-02-19 17:01:24 -05:00
Tyler Goodlet d7179d47b0 `.tsp._anal`: add (unused) `detect_vlm_gaps()` 2025-02-19 17:01:24 -05:00
Tyler Goodlet c390e87536 `.storage.cli`: collect gap-markup-aids into `tf2aids: dict` prior to pause for introspection 2025-02-19 17:01:24 -05:00
Tyler Goodlet 5e4a6d61c7 Ignore any non-`.parquet` files under `.config/piker/nativedb/` subdir 2025-02-19 17:01:24 -05:00
Tyler Goodlet 3caaa30b03 Mask no-data pause, add perps to no-`/src`-in-fqme asset set
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to the `.start_backfill()` input literal
set since we don't expect the 'btc/usd.perp.binance' for now.
2025-02-19 17:01:24 -05:00
20 changed files with 638 additions and 635 deletions

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -60,12 +61,13 @@ async def _setup_persistent_brokerd(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
debug_mode: bool = False,
) -> None: ) -> None:
''' '''
Allocate a actor-wide service nursery in ``brokerd`` Allocate a actor-wide service nursery in `brokerd` such that
such that feeds can be run in the background persistently by feeds can be run in the background persistently by the broker
the broker backend as needed. backend as needed.
''' '''
# NOTE: we only need to setup logging once (and only) here # NOTE: we only need to setup logging once (and only) here
@ -86,6 +88,18 @@ async def _setup_persistent_brokerd(
from piker.data import feed from piker.data import feed
assert not feed._bus assert not feed._bus
if (
debug_mode
and
tractor.current_actor().is_infected_aio()
):
# NOTE, whenever running `asyncio` in provider's actor
# runtime be sure we enabled `breakpoint()` support
# for non-`trio.Task` usage.
from tractor.devx._debug import maybe_init_greenback
await maybe_init_greenback()
# breakpoint() # XXX, SHOULD WORK from `trio.Task`!
# allocate a nursery to the bus for spawning background # allocate a nursery to the bus for spawning background
# tasks to service client IPC requests, normally # tasks to service client IPC requests, normally
# `tractor.Context` connections to explicitly required # `tractor.Context` connections to explicitly required
@ -145,18 +159,21 @@ def broker_init(
above. above.
''' '''
from ..brokers import get_brokermod brokermod: ModuleType = get_brokermod(brokername)
brokermod = get_brokermod(brokername)
modpath: str = brokermod.__name__ modpath: str = brokermod.__name__
spawn_kws: dict = getattr(
start_actor_kwargs['name'] = f'brokerd.{brokername}' brokermod,
start_actor_kwargs.update( '_spawn_kwargs',
getattr( {},
brokermod,
'_spawn_kwargs',
{},
)
) )
# ^^ NOTE, here we pull any runtime parameters specific
# to spawning the sub-actor for the backend. For ex.
# both `ib` and `deribit` rely on,
# `'infect_asyncio': True,` since they both
# use `tractor`'s "infected `asyncio` mode"
# for their libs but you could also do something like
# `'debug_mode: True` which would be like passing
# `--pdb` for just that provider backend.
# XXX TODO: make this not so hacky/monkeypatched.. # XXX TODO: make this not so hacky/monkeypatched..
# -> we need a sane way to configure the logging level for all # -> we need a sane way to configure the logging level for all
@ -166,8 +183,7 @@ def broker_init(
# lookup actor-enabled modules declared by the backend offering the # lookup actor-enabled modules declared by the backend offering the
# `brokerd` endpoint(s). # `brokerd` endpoint(s).
enabled: list[str] enabled: list[str] = [
enabled = start_actor_kwargs['enable_modules'] = [
__name__, # so that eps from THIS mod can be invoked __name__, # so that eps from THIS mod can be invoked
modpath, modpath,
] ]
@ -179,9 +195,13 @@ def broker_init(
subpath: str = f'{modpath}.{submodname}' subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath) enabled.append(subpath)
datad_kwargs: dict = {
'name': f'brokerd.{brokername}',
'enable_modules': enabled,
}
return ( return (
brokermod, brokermod,
start_actor_kwargs, # to `ActorNursery.start_actor()` start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()`
# XXX see impl above; contains all (actor global) # XXX see impl above; contains all (actor global)
# setup/teardown expected in all `brokerd` actor instances. # setup/teardown expected in all `brokerd` actor instances.
@ -190,14 +210,17 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -211,33 +234,41 @@ async def spawn_brokerd(
**tractor_kwargs, **tractor_kwargs,
) )
brokermod = get_brokermod(brokername)
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor( mngr: ServiceMngr = get_service_mngr()
dname, ctx: tractor.Context = await mngr.start_service(
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), daemon_name=dname,
debug_mode=Services.debug_mode, ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername,
loglevel=loglevel,
debug_mode=mngr.debug_mode,
),
debug_mode=mngr.debug_mode,
# ^TODO, allow overriding this per-daemon from client side?
# |_ it's already supported in `tractor` so..
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs **tractor_kwargs
) )
assert (
# NOTE: the service mngr expects an already spawned actor + its not ctx.cancel_called
# portal ref in order to do non-blocking setup of brokerd and ctx.portal # parent side
# service nursery. and dname in ctx.chan.uid # subactor is named as desired
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
) )
return True return True
@ -262,8 +293,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -140,11 +140,10 @@ def pikerd(
if pdb: if pdb:
log.warning(( log.warning((
"\n" '\n\n'
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" '!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n'
"When a `piker` daemon crashes it will block the " 'When a `piker` daemon crashes it will block the '
"task-thread until resumed from console!\n" 'task-thread until resumed from console!\n'
"\n"
)) ))
# service-actor registry endpoint socket-address set # service-actor registry endpoint socket-address set
@ -177,7 +176,7 @@ def pikerd(
from .. import service from .. import service
async def main(): async def main():
service_mngr: service.Services service_mngr: service.ServiceMngr
async with ( async with (
service.open_pikerd( service.open_pikerd(
@ -335,7 +334,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio_typing import TaskStatus from trio import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample a (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below `.service.maybe_open_samplerd()` and the below
``register_with_sampler()``. `register_with_sampler()`.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -95,6 +97,12 @@ class Sampler:
# history loading. # history loading.
incr_task_cs: trio.CancelScope | None = None incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for # holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are # a particular sample period increment event: all subscribers are
# notified on a step. # notified on a step.
@ -258,14 +266,15 @@ class Sampler:
subs: set subs: set
last_ts, subs = pair last_ts, subs = pair
task = trio.lowlevel.current_task() # NOTE, for debugging pub-sub issues
log.debug( # task = trio.lowlevel.current_task()
f'SUBS {self.subscribers}\n' # log.debug(
f'PAIR {pair}\n' # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
f'TASK: {task}: {id(task)}\n' # f'PAIR: {pair}\n'
f'broadcasting {period_s} -> {last_ts}\n' # f'TASK: {task}: {id(task)}\n'
# f'consumers: {subs}' # f'broadcasting {period_s} -> {last_ts}\n'
) # f'consumers: {subs}'
# )
borked: set[MsgStream] = set() borked: set[MsgStream] = set()
sent: set[MsgStream] = set() sent: set[MsgStream] = set()
while True: while True:
@ -282,12 +291,11 @@ class Sampler:
await stream.send(msg) await stream.send(msg)
sent.add(stream) sent.add(stream)
except ( except self.bcast_errors as err:
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error( log.error(
f'{stream._ctx.chan.uid} dropped connection' f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
) )
borked.add(stream) borked.add(stream)
else: else:
@ -375,7 +383,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys())) await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -394,7 +405,8 @@ async def register_with_sampler(
finally: finally:
if ( if (
sub_for_broadcasts sub_for_broadcasts
and subs and
subs
): ):
try: try:
subs.remove(stream) subs.remove(stream)
@ -419,7 +431,6 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -429,7 +440,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -437,26 +451,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']: mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks: async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
portal = await Services.actor_n.start_actor( daemon_name=dname,
dname, ctx_ep=partial(
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler, register_with_sampler,
period_s=1, period_s=1,
sub_for_broadcasts=False, sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
) )
return True return True
@ -561,8 +582,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus,
bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -582,11 +602,33 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes) # print(quotes)
# TODO: ``numba`` this! # XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -659,6 +701,21 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct
@ -728,18 +785,14 @@ async def sample_and_broadcast(
if lags > 10: if lags > 10:
await tractor.pause() await tractor.pause()
except ( except Sampler.bcast_errors as ipc_err:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
ctx: Context = ipc._ctx ctx: Context = ipc._ctx
chan: Channel = ctx.chan chan: Channel = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
'Dropped `brokerd`-quotes-feed connection:\n' f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'{broker_symbol}:' f'x>) {ctx.cid}@{chan.uid}'
f'{ctx.cid}@{chan.uid}' f'|_{ipc_err!r}\n\n'
) )
if sub.throttle_rate: if sub.throttle_rate:
assert ipc._closed assert ipc._closed
@ -756,12 +809,11 @@ async def sample_and_broadcast(
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream, stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -779,13 +831,16 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
''' '''
# TODO: compute the approx overhead latency per cycle # ?TODO? dynamically compute the **actual** approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616 # instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# send cycle state # send cycle state
first_quote: dict|None
first_quote = last_quote = None first_quote = last_quote = None
last_send = time.time() last_send: float = time.time()
diff = 0 diff: float = 0
task_status.started() task_status.started()
ticks_by_type: dict[ ticks_by_type: dict[
@ -796,22 +851,28 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears'] clear_types = _tick_groups['clears']
while True: while True:
# compute the remaining time to sleep for this throttled cycle # compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff left_to_sleep: float = throttle_period - diff
if left_to_sleep > 0: if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs: with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try: try:
sym, last_quote = await quote_stream.receive() sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel: except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?") log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
break break
diff = time.time() - last_send diff: float = time.time() - last_send
if not first_quote: if not first_quote:
first_quote = last_quote first_quote: float = last_quote
# first_quote['tbt'] = ticks_by_type # first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0: if (throttle_period - diff) > 0:
@ -872,7 +933,9 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({
sym: first_quote
})
except tractor.RemoteActorError as rme: except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun: if rme.type is not tractor._exceptions.StreamOverrun:
raise raise
@ -883,19 +946,28 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}' f'{sym}:{ctx.cid}@{chan.uid}'
) )
# NOTE: any of these can be raised by `tractor`'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
except ( except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,
): ) + Sampler.bcast_errors as ipc_err:
# if the feed consumer goes down then drop match ipc_err:
# out of this rate limiter case trio.EndOfChannel():
log.warning(f'{stream} closed') log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
await stream.aclose() await stream.aclose()
return return

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception(f'Retrying connection') log.exception('Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing of msgs JSONRPC response-request style machinery for transparent multiplexing
over a NoBsWs. of msgs over a `NoBsWs`.
''' '''
@ -377,43 +377,82 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
request_type: Optional[type] = None, msg_recv_timeout: float = float('inf'),
request_hook: Optional[Callable] = None, # ^NOTE, since only `deribit` is using this jsonrpc stuff atm
error_hook: Optional[Callable] = None, # and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as tn,
open_autorecon_ws(url) as ws open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': next(rpc_id), 'id': req_id,
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
rpc_results[_id] = { result = rpc_results[_id] = {
'result': None, 'result': None,
'event': trio.Event() 'error': None,
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result'] if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
del rpc_results[_id] else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +467,7 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -451,19 +491,28 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
log.warning(f'Recieved\n{error}') # retreive orig request msg, set error
if error_hook: # response in original "result" msg,
await error_hook(response_type(**msg)) # THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task) tn.start_soon(recv_task)
yield json_rpc yield json_rpc
n.cancel_scope.cancel() tn.cancel_scope.cancel()

View File

@ -19,6 +19,10 @@ Log like a forester!
""" """
import logging import logging
import json import json
import reprlib
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -84,3 +88,29 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import Services as Services from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -30,13 +29,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
Services, open_service_mngr,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None, tractor_runtime_overrides: dict|None = None,
**tractor_kwargs, **tractor_kwargs,
) -> tuple[ ) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> Services: ) -> ServiceMngr:
''' '''
Start a root piker daemon with an indefinite lifetime. Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep A root actor-nursery is created which can be used to spawn and
alive underling services (see below). supervise underling service sub-actors (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
# assign globally for future daemon/task creation mngr: ServiceMngr
Services.actor_n = actor_nursery async with open_service_mngr(
Services.service_n = service_nursery debug_mode=debug_mode,
Services.debug_mode = debug_mode ) as mngr:
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: Optional[str] = None, # loglevel: str|None = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import Services from ._mngr import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: Services, services: ServiceMngr,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
trio.MultiError, # trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
Services, get_service_mngr,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = Services.locks[service_name] lock = _locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -102,6 +104,12 @@ async def maybe_spawn_daemon(
# service task for that actor. # service task for that actor.
started: bool started: bool
if pikerd_portal is None: if pikerd_portal is None:
# await tractor.pause()
if tractor_kwargs.get('debug_mode', False):
from tractor.devx._debug import maybe_init_greenback
await maybe_init_greenback()
started = await service_task_target( started = await service_task_target(
loglevel=loglevel, loglevel=loglevel,
**spawn_args, **spawn_args,
@ -132,7 +140,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
await portal.cancel_actor() # --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -147,26 +213,25 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
portal = await Services.actor_n.start_actor( smngr: ServiceMngr = get_service_mngr()
portal = await smngr.an.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task( await smngr.start_service_ctx(
'emsd', name='emsd',
portal, portal=portal,
ctx_fn=_setup_persistent_emsd,
# signature of target root-task endpoint
_setup_persistent_emsd,
loglevel=loglevel, loglevel=loglevel,
) )
return True return True

View File

@ -18,148 +18,36 @@
daemon-service management API. daemon-service management API.
""" """
from collections import defaultdict from contextlib import (
from typing import ( asynccontextmanager as acm,
Callable,
Any,
) )
import trio
from trio_typing import TaskStatus
import tractor import tractor
from tractor import ( from tractor.hilevel import (
current_actor, ServiceMngr,
ContextCancelled, # open_service_mngr as _open_service_mngr,
Context, get_service_mngr as get_service_mngr,
Portal,
) )
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
from ._util import ( Services: ServiceMngr|None = None
log, # sub-sys logger
)
@acm
async def open_service_mngr(
**kwargs,
) -> ServiceMngr:
# TODO: we need remote wrapping and a general soln: global Services
# - factor this into a ``tractor.highlevel`` extension # pack for the async with tractor.hilevel.open_service_mngr(
# library. **kwargs,
# - wrap a "remote api" wherein you can get a method proxy ) as mngr:
# to the pikerd actor for starting services remotely! # Services = proxy(mngr)
# - prolly rename this to ActorServicesNursery since it spawns Services = mngr
# new actors and supervises them to completion? yield mngr
class Services: Services = None
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: Portal,
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, first):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import Services from . import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -386,6 +386,8 @@ def ldshm(
open_annot_ctl() as actl, open_annot_ctl() as actl,
): ):
shm_df: pl.DataFrame | None = None shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for ( for (
shmfile, shmfile,
shm, shm,
@ -526,16 +528,17 @@ def ldshm(
new_df, new_df,
step_gaps, step_gaps,
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
await tractor.pause() # await tractor.pause()
assert aids assert aids
tf2aids[period_s] = aids
else: else:
# allow interaction even when no ts problems. # allow interaction even when no ts problems.
await tractor.pause() assert not diff
# assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None: if shm_df is None:
log.error( log.error(

View File

@ -161,7 +161,13 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if path.name in {'borked', 'expired',}: if (
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and end_dt < start_dt and
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
frame_types: dict[str, Duration] | None, def_frame_duration: Duration,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill # TODO: per-provider default history-durations?
# limits inside a [storage] section in conf.toml? # -[ ] inside the `open_history_client()` config allow
# when no tsdb "last datum" is provided, we just load # declaring the history duration limits instead of
# some near-term history. # guessing and/or applying the same limits to all?
# periods = { #
# 1: {'days': 1}, # -[ ] allow declaring (default) per-provider backfill
# 60: {'days': 14}, # limits inside a [storage] sub-section in conf.toml?
# } #
# NOTE, when no tsdb "last datum" is provided, we just
# do a decently sized backfill and load it into storage. # load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend = True update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -426,71 +430,114 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
# 3 cases: orig_last_start_dt: datetime = last_start_dt
# - frame in the middle of a legit venue gap gap_report: str = (
# - history actually began at the `last_start_dt` f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
# - some other unknown error (ib blocking the f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
# history bc they don't want you seeing how they f'last_start_dt: {orig_last_start_dt}\n\n'
# cucked all the tinas..) f'bf_until: {backfill_until_dt}\n'
if dur := frame_types.get(timeframe): )
# decrement by a frame's worth of duration and # EMPTY FRAME signal with 3 (likely) causes:
# retry a few times. #
last_start_dt.subtract( # 1. range contains legit gap in venue history
seconds=dur.total_seconds() # 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
log.warning( gap_report += (
f'{mod.name} -> EMPTY FRAME for end_dt?\n' f'Decrementing `end_dt` and retrying with,\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n' f'def_frame_duration: {def_frame_duration}\n'
'bf_until <- last_start_dt:\n' f'(new) last_start_dt: {last_start_dt}\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
# broker says there never was or is no more history to pull else:
except DataUnavailable: # await tractor.pause()
log.warning( raise DataUnavailable(gap_report)
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
# ugh, what's a better way? # broker says there never was or is no more history to pull
# TODO: fwiw, we probably want a way to signal a throttle except DataUnavailable as due:
# condition (eg. with ib) so that we can halt the message: str = due.args[0]
# request loop until the condition is resolved? log.warning(
if timeframe > 1: f'Provider {mod.name!r} halted backfill due to,\n\n'
await tractor.pause()
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
return return
time: np.ndarray = array['time']
assert ( assert (
array['time'][0] time[0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
diff = last_start_dt - next_start_dt assert time[-1] == next_end_dt.timestamp()
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe recv_frame_dur: Duration = (
if frame_time_diff_s > expected_frame_size_s: from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
'GAP DETECTED:\n' f'{timeframe}s-series {reason} detected!\n'
f'last_start_dt: {last_start_dt}\n' f'fqme: {mkt.fqme}\n'
f'diff: {diff}\n' f'last_start_dt: {last_start_dt}\n\n'
f'frame_time_diff_s: {frame_time_diff_s}\n' f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -565,22 +612,27 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and write_tsdb and
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
# always drop the src asset token for # NOTE, always drop the src asset token for
# non-currency-pair like market types (for now) # non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in { if mkt.dst.atype not in {
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme( col_sym_key: str = mkt.get_fqme(
delim_char='', delim_char='',
without_src=True, without_src=True,
@ -685,7 +737,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s = ( backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -908,6 +960,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -918,7 +972,6 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -947,6 +1000,25 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -977,7 +1049,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
frame_types=config.get('frame_types', None), def_frame_duration=def_frame_size,
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

View File

@ -616,6 +616,18 @@ def detect_price_gaps(
# ]) # ])
... ...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe( def dedupe(
src_df: pl.DataFrame, src_df: pl.DataFrame,
@ -626,7 +638,6 @@ def dedupe(
) -> tuple[ ) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped int, # len diff between input and deduped
]: ]:
@ -639,19 +650,22 @@ def dedupe(
''' '''
wdts: pl.DataFrame = with_dts(src_df) wdts: pl.DataFrame = with_dts(src_df)
# maybe sort on any time field deduped = wdts
if sort:
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
# remove duplicated datetime samples/sections # remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique( deduped: pl.DataFrame = wdts.unique(
subset=['dt'], # subset=['dt'],
subset=['time'],
maintain_order=True, maintain_order=True,
) )
# maybe sort on any time field
if sort:
deduped = deduped.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
diff: int = ( diff: int = (
wdts.height wdts.height
- -

View File

@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types. types.
''' '''
from __future__ import annotations from tractor.msg import Struct as Struct
from collections import UserList
from pprint import (
saferepr,
)
from typing import Any
from msgspec import (
msgpack,
Struct as _Struct,
structs,
)
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,
update: dict | None = None,
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
def typecast(
self,
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
return diffs

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
Services, get_service_mngr,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is Services assert service_manager is get_service_mngr()
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import Services from piker.service import ServiceMngr
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: ServiceMngr,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
Services, ServiceMngr,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: Services services: ServiceMngr
async with ( async with (
open_test_pikerd( open_test_pikerd(