Pass `loglevel` down through `.data` callstack

Add `loglevel` param propagation across the data feed and sampling
subsystems to enable proper console log setup in downstream (distibuted)
subactor tasks. This ensures sampler and history-mgmt tasks receive the
same loglevel as their parent `.data.feed` tasks.

Deats,
- add `loglevel: str|None` param to `register_with_sampler()`,
  `maybe_open_samplerd()`, and `open_sample_stream()`.
- pass `loglevel` through to `get_console_log()` in
  `register_with_sampler()` with fallback to actor `loglevel`.
- use `partial()` in `allocate_persistent_feed()` to pass
  `loglevel` to `manage_history()` at task-start.
- add `loglevel` param to `manage_history()` with default
  `'warning'` and pass through to `open_sample_stream()` from there.
- capture `loglevel` var in `brokers.cli.search()` and pass to
  `symbol_search()` call.

Also,
- drop blank lines in fn sigs for consistency with piker style.
- add debug bp in `open_feed()` when `loglevel != 'info'`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
fix_tractor_logging
Gud Boi 2026-02-11 19:56:14 -05:00
parent c1530c7a37
commit aa403bd390
4 changed files with 44 additions and 18 deletions

View File

@ -481,11 +481,12 @@ def search(
# the `piker --pdb` XD .. # the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum # -[ ] pull from the parent click ctx's values..dumdum
# assert pdb # assert pdb
loglevel: str = config['loglevel']
# define tractor entrypoint # define tractor entrypoint
async def main(func): async def main(func):
async with maybe_open_pikerd( async with maybe_open_pikerd(
loglevel=config['loglevel'], loglevel=loglevel,
debug_mode=pdb, debug_mode=pdb,
): ):
return await func() return await func()
@ -498,6 +499,7 @@ def search(
core.symbol_search, core.symbol_search,
brokermods, brokermods,
pattern, pattern,
loglevel=loglevel,
), ),
) )

View File

@ -336,10 +336,18 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs? open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates? sub_for_broadcasts: bool = True, # sampler side to send step updates?
loglevel: str|None = None,
) -> set[int]: ) -> set[int]:
get_console_log(tractor.current_actor().loglevel) get_console_log(
level=(
loglevel
or
tractor.current_actor().loglevel
),
name=__name__,
)
incr_was_started: bool = False incr_was_started: bool = False
try: try:
@ -476,6 +484,7 @@ async def spawn_samplerd(
register_with_sampler, register_with_sampler,
period_s=1, period_s=1,
sub_for_broadcasts=False, sub_for_broadcasts=False,
loglevel=loglevel,
) )
return True return True
@ -484,7 +493,6 @@ async def spawn_samplerd(
@acm @acm
async def maybe_open_samplerd( async def maybe_open_samplerd(
loglevel: str|None = None, loglevel: str|None = None,
**pikerd_kwargs, **pikerd_kwargs,
@ -513,10 +521,10 @@ async def open_sample_stream(
shms_by_period: dict[float, dict]|None = None, shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True, open_index_stream: bool = True,
sub_for_broadcasts: bool = True, sub_for_broadcasts: bool = True,
loglevel: str|None = None,
cache_key: str|None = None, # cache_key: str|None = None,
allow_new_sampler: bool = True, # allow_new_sampler: bool = True,
ensure_is_active: bool = False, ensure_is_active: bool = False,
) -> AsyncIterator[dict[str, float]]: ) -> AsyncIterator[dict[str, float]]:
@ -551,7 +559,9 @@ async def open_sample_stream(
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
maybe_open_samplerd() as portal, maybe_open_samplerd(
loglevel=loglevel,
) as portal,
portal.open_context( portal.open_context(
register_with_sampler, register_with_sampler,
@ -560,6 +570,7 @@ async def open_sample_stream(
'shms_by_period': shms_by_period, 'shms_by_period': shms_by_period,
'open_index_stream': open_index_stream, 'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts, 'sub_for_broadcasts': sub_for_broadcasts,
'loglevel': loglevel,
}, },
) as (ctx, shm_periods) ) as (ctx, shm_periods)
): ):

View File

@ -239,7 +239,6 @@ async def allocate_persistent_feed(
brokername: str, brokername: str,
symstr: str, symstr: str,
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
init_timeout: float = 616, init_timeout: float = 616,
@ -348,11 +347,14 @@ async def allocate_persistent_feed(
izero_rt, izero_rt,
rt_shm, rt_shm,
) = await bus.nursery.start( ) = await bus.nursery.start(
partial(
manage_history, manage_history,
mod, mod=mod,
mkt, mkt=mkt,
some_data_ready, some_data_ready=some_data_ready,
feed_is_live, feed_is_live=feed_is_live,
loglevel=loglevel,
)
) )
# yield back control to starting nursery once we receive either # yield back control to starting nursery once we receive either
@ -460,7 +462,6 @@ async def allocate_persistent_feed(
@tractor.context @tractor.context
async def open_feed_bus( async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbols: list[str], # normally expected to the broker-specific fqme symbols: list[str], # normally expected to the broker-specific fqme
@ -818,6 +819,11 @@ async def maybe_open_feed(
''' '''
fqme = fqmes[0] fqme = fqmes[0]
# if (
# loglevel != 'info'
# ):
# await tractor.pause()
async with trionics.maybe_open_context( async with trionics.maybe_open_context(
acm_func=open_feed, acm_func=open_feed,
kwargs={ kwargs={
@ -884,9 +890,12 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme) providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod feed.mods[mod.name] = mod
if (
loglevel != 'info'
):
await tractor.pause()
# one actor per brokerd for now # one actor per brokerd for now
brokerd_ctxs = [] brokerd_ctxs = []
for brokermod, bfqmes in providers.items(): for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn # if no `brokerd` for this backend exists yet we spawn

View File

@ -63,8 +63,10 @@ from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
) )
from ..data._source import def_iohlcv_fields from piker.data._source import (
from ..data._sampling import ( def_iohlcv_fields,
)
from piker.data._sampling import (
open_sample_stream, open_sample_stream,
) )
@ -1322,6 +1324,7 @@ async def manage_history(
mkt: MktPair, mkt: MktPair,
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = 'warning',
timeframe: float = 60, # in seconds timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5, wait_for_live_timeout: float = 0.5,
@ -1499,6 +1502,7 @@ async def manage_history(
# data feed layer that needs to consume it). # data feed layer that needs to consume it).
open_index_stream=True, open_index_stream=True,
sub_for_broadcasts=False, sub_for_broadcasts=False,
loglevel=loglevel,
) as sample_stream: ) as sample_stream:
# register 1s and 1m buffers with the global # register 1s and 1m buffers with the global