Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet 46285a601e Port `.cli` & `.service` to latest `tractor` registry APIs
Namely changes for the `registry_addrs: list`, enable_transports: list`
and related `tractor._addr` primitive requirements.

Other updates include,
- passing `maybe_enable_greenback=True`,
- additional exc logging around `pikerd` syncing/booting,
- changing to newer `Context.wait_for_result()`,
- dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep.
2025-09-20 22:38:47 -04:00
Tyler Goodlet f9610c9e26 Bump to WIP "piker pin" `tractor` dev branch, with lock file 2025-09-20 22:36:53 -04:00
Tyler Goodlet 9d5e405903 binance; unmask around send-chan @acm usage 2025-09-20 22:32:05 -04:00
Tyler Goodlet e19a724037 ib: add venue-hours checking
Such that we can avoid other (pretty unreliable) "alternative" checks to
determine whether a real-time quote should be waited on or (when venue
is closed) we should just signal that historical backfilling can
commence immediately.

This has been a todo for a very long time and it turned out to be much
easier to accomplish than anticipated..

Deats,
- add a new `is_current_time_in_range()` dt range checker to predicate
  whether an input range contains `datetime.now(start_dt.tzinfo)`.
- in `.ib.feed.stream_quotes()` add a `venue_is_open: bool` which uses
  all of the new ^^ to determine whether to branch for the
  short-circuit-and-do-history-now-case or the std real-time-quotes
  should-be-awaited-since-venue-is-open, case; drop all the old hacks
  trying to workaround not figuring that venue state stuff..

Other,
- also add a gpt5 composed parser to `._util` for the
  `ib_insync.ContractDetails.tradingHours: str` for before i realized
  there was a `.tradingSessions` property XD
- in `.ib_feed`,
  * add various EG-collapsings per recent tractor/trio updates.
  * better logging / exc-handling around ticker quote pushes.
  * stop clearing `Ticker.ticks` each quote iteration; not sure if this
    is needed/correct tho?
  * add masked `Ticker.ticks` poll loop that logs.
- fix some `str.format()` usage in `._util.try_xdo_manual()`
2025-09-20 22:13:59 -04:00
10 changed files with 452 additions and 228 deletions

View File

@ -448,7 +448,6 @@ async def subscribe(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -460,6 +459,7 @@ async def stream_quotes(
) -> None: ) -> None:
async with ( async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan, send_chan as send_chan,
open_cached_client('binance') as client, open_cached_client('binance') as client,
): ):

View File

@ -20,6 +20,11 @@ runnable script-programs.
''' '''
from __future__ import annotations from __future__ import annotations
from datetime import ( # noqa
datetime,
date,
tzinfo as TzInfo,
)
from functools import partial from functools import partial
from typing import ( from typing import (
Literal, Literal,
@ -75,7 +80,7 @@ def try_xdo_manual(
return True return True
except OSError: except OSError:
log.exception( log.exception(
no_setup_msg.format(vnc_sockaddr) no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
) )
return False return False
@ -124,7 +129,7 @@ async def data_reset_hack(
if not vnc_sockaddr: if not vnc_sockaddr:
log.warning( log.warning(
no_setup_msg.format(vnc_sockaddr) no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
+ +
'REQUIRES A `vnc_addrs: array` ENTRY' 'REQUIRES A `vnc_addrs: array` ENTRY'
) )
@ -153,7 +158,7 @@ async def data_reset_hack(
import i3ipc # noqa (since a deps dynamic check) import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError: except ModuleNotFoundError:
log.warning( log.warning(
no_setup_msg.format(vnc_sockaddr) no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
) )
return False return False
@ -164,7 +169,7 @@ async def data_reset_hack(
focussed, matches = i3ipc_fin_wins_titled() focussed, matches = i3ipc_fin_wins_titled()
if not matches: if not matches:
log.warning( log.warning(
no_setup_msg.format(vnc_sockaddr) no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
) )
return False return False
else: else:
@ -337,3 +342,99 @@ def i3ipc_xdotool_manual_click_hack() -> None:
]) ])
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
log.exception('xdotool timed out?') log.exception('xdotool timed out?')
def is_current_time_in_range(
start_dt: datetime,
end_dt: datetime,
) -> bool:
'''
Check if current time is within the datetime range.
Use any/the-same timezone as provided by `start_dt.tzinfo` value
in the range.
'''
now: datetime = datetime.now(start_dt.tzinfo)
return start_dt <= now <= end_dt
# TODO, put this into `._util` and call it from here!
#
# NOTE, this was generated by @guille from a gpt5 prompt
# and was originally thot to be needed before learning about
# `ib_insync.contract.ContractDetails._parseSessions()` and
# it's downstream meths..
#
# This is still likely useful to keep for now to parse the
# `.tradingHours: str` value manually if we ever decide
# to move off `ib_async` and implement our own `trio`/`anyio`
# based version Bp
#
# >attempt to parse the retarted ib "time stampy thing" they
# >do for "venue hours" with this.. written by
# >gpt5-"thinking",
#
def parse_trading_hours(
spec: str,
tz: TzInfo|None = None
) -> dict[
date,
tuple[datetime, datetime]
]|None:
'''
Parse venue hours like:
'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...'
Returns `dict[date] = (open_dt, close_dt)` or `None` if
closed.
'''
if (
not isinstance(spec, str)
or
not spec
):
raise ValueError('spec must be a non-empty string')
out: dict[
date,
tuple[datetime, datetime]
]|None = {}
for part in (p.strip() for p in spec.split(';') if p.strip()):
if part.endswith(':CLOSED'):
day_s, _ = part.split(':', 1)
d = datetime.strptime(day_s, '%Y%m%d').date()
out[d] = None
continue
try:
start_s, end_s = part.split('-', 1)
start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M')
end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M')
except ValueError as exc:
raise ValueError(f'invalid segment: {part}') from exc
if tz is not None:
start_dt = start_dt.replace(tzinfo=tz)
end_dt = end_dt.replace(tzinfo=tz)
out[start_dt.date()] = (start_dt, end_dt)
return out
# ORIG desired usage,
#
# TODO, for non-drunk tomorrow,
# - call above fn and check that `output[today] is not None`
# trading_hrs: dict = parse_trading_hours(
# details.tradingHours
# )
# liq_hrs: dict = parse_trading_hours(
# details.liquidHours
# )

View File

@ -26,7 +26,6 @@ from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
from math import isnan
import time import time
from typing import ( from typing import (
Any, Any,
@ -69,7 +68,10 @@ from .api import (
Contract, Contract,
RequestError, RequestError,
) )
from ._util import data_reset_hack from ._util import (
data_reset_hack,
is_current_time_in_range,
)
from .symbols import get_mkt_info from .symbols import get_mkt_info
if TYPE_CHECKING: if TYPE_CHECKING:
@ -184,7 +186,8 @@ async def open_history_client(
if ( if (
start_dt start_dt
and start_dt.timestamp() == 0 and
start_dt.timestamp() == 0
): ):
await tractor.pause() await tractor.pause()
@ -203,7 +206,7 @@ async def open_history_client(
): ):
count += 1 count += 1
mean += latency / count mean += latency / count
print( log.debug(
f'HISTORY FRAME QUERY LATENCY: {latency}\n' f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}' f'mean: {mean}'
) )
@ -607,7 +610,10 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn # such that simultaneous symbol queries don't try data resettingn
# too fast.. # too fast..
unset_resetter: bool = False unset_resetter: bool = False
async with trio.open_nursery() as nurse: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# start history request that we allow # start history request that we allow
# to run indefinitely until a result is acquired # to run indefinitely until a result is acquired
@ -689,10 +695,17 @@ async def _setup_quote_stream(
async with load_aio_clients( async with load_aio_clients(
disconnect_on_exit=False, disconnect_on_exit=False,
) as accts2clients: ) as accts2clients:
# since asyncio.Task
# tractor.pause_from_sync()
caccount_name, client = get_preferred_data_client(accts2clients) caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(
contract,
','.join(opts),
)
# NOTE: it's batch-wise and slow af but I guess could # NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe? # be good for backchecking? Seems to be every 5s maybe?
@ -716,10 +729,10 @@ async def _setup_quote_stream(
Push quotes to trio task. Push quotes to trio task.
""" """
# log.debug(t)
# log.debug(f'new IB quote: {t}\n')
try: try:
to_trio.send_nowait(t) to_trio.send_nowait(t)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
@ -734,21 +747,47 @@ async def _setup_quote_stream(
# resulting in tracebacks spammed to console.. # resulting in tracebacks spammed to console..
# Manually do the dereg ourselves. # Manually do the dereg ourselves.
teardown() teardown()
except trio.WouldBlock:
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock: # for slow debugging purposes to avoid clobbering prompt
# # for slow debugging purposes to avoid clobbering prompt # with log msgs
# # with log msgs except trio.WouldBlock:
# pass log.exception(
f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n'
f'\n'
f'{to_trio.statistics()}\n'
)
# ?TODO, handle re-connection attempts?
except BaseException as _berr:
berr = _berr
log.exception(
f'Failed to push ticker quote !?\n'
f'cause: {berr}\n'
f'\n'
f't: {t}\n'
f'{to_trio.statistics}\n'
)
# raise berr
ticker.updateEvent.connect(push) ticker.updateEvent.connect(push)
try: try:
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
# XXX, just for debug..
# tractor.pause_from_sync()
# while True:
# await asyncio.sleep(1.6)
# if ticker.ticks:
# log.debug(
# f'ticker.ticks = \n'
# f'{ticker.ticks}\n'
# )
# else:
# log.warning(
# 'UHH no ticker.ticks ??'
# )
finally: finally:
teardown() teardown()
@ -820,7 +859,7 @@ def normalize(
tbt = ticker.tickByTicks tbt = ticker.tickByTicks
if tbt: if tbt:
print(f'tickbyticks:\n {ticker.tickByTicks}') log.info(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks ticker.ticks = new_ticks
@ -861,22 +900,28 @@ async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# TODO? we need to hook into the `ib_async` logger like
# we can with i3ipc from modden!
# loglevel: str|None = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Stream symbol quotes. Stream `symbols[0]` quotes back via `send_chan`.
This is a ``trio`` callable routine meant to be invoked The `feed_is_live: Event` is set to signal the caller that it can
once the brokerd is up. begin processing msgs from the mem-chan.
''' '''
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym: str = symbols[0]
log.info(f'request for real-time quotes: {sym}') log.info(
f'request for real-time quotes\n'
f'sym: {sym!r}\n'
)
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
@ -885,34 +930,49 @@ async def stream_quotes(
details: ibis.ContractDetails details: ibis.ContractDetails
async with ( async with (
open_data_client() as proxy, open_data_client() as proxy,
# trio.open_nursery() as tn,
): ):
mkt, details = await get_mkt_info( mkt, details = await get_mkt_info(
sym, sym,
proxy=proxy, # passed to avoid implicit client load proxy=proxy, # passed to avoid implicit client load
) )
# is venue active rn?
venue_is_open: bool = any(
is_current_time_in_range(
start_dt=sesh.start,
end_dt=sesh.end,
)
for sesh in details.tradingSessions()
)
init_msg = FeedInit(mkt_info=mkt) init_msg = FeedInit(mkt_info=mkt)
# NOTE, tell sampler (via config) to skip vlm summing for dst
# assets which provide no vlm data..
if mkt.dst.atype in { if mkt.dst.atype in {
'fiat', 'fiat',
'index', 'index',
'commodity', 'commodity',
}: }:
# tell sampler config that it shouldn't do vlm summing.
init_msg.shm_write_opts['sum_tick_vlm'] = False init_msg.shm_write_opts['sum_tick_vlm'] = False
init_msg.shm_write_opts['has_vlm'] = False init_msg.shm_write_opts['has_vlm'] = False
init_msgs.append(init_msg) init_msgs.append(init_msg)
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker | None = None first_ticker: Ticker|None = None
with trio.move_on_after(1):
with trio.move_on_after(1.6) as quote_cs:
first_ticker: Ticker = await proxy.get_quote( first_ticker: Ticker = await proxy.get_quote(
contract=con, contract=con,
raise_on_timeout=False, raise_on_timeout=False,
) )
# XXX should never happen with this ep right?
# but if so then, more then likely mkt is closed?
if quote_cs.cancelled_caught:
await tractor.pause()
if first_ticker: if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
@ -924,28 +984,27 @@ async def stream_quotes(
f'{pformat(first_quote)}\n' f'{pformat(first_quote)}\n'
) )
# NOTE: it might be outside regular trading hours for # XXX NOTE: whenever we're "outside regular trading hours"
# assets with "standard venue operating hours" so we # (only relevant for assets coming from the "legacy markets"
# only "pretend the feed is live" when the dst asset # space) so we basically (from an API/runtime-operational
# type is NOT within the NON-NORMAL-venue set: aka not # perspective) "pretend the feed is live" even if it's
# commodities, forex or crypto currencies which CAN # actually closed.
# always return a NaN on a snap quote request during #
# normal venue hours. In the case of a closed venue # IOW, we signal to the effective caller (task) that the live
# (equitiies, futes, bonds etc.) we at least try to # feed is "already up" but really we're just indicating that
# grab the OHLC history. # the OHLCV history can start being loaded immediately by the
if ( # `piker.data`/`.tsp` layers.
first_ticker #
and # XXX, deats: the "pretend we're live" is just done by
isnan(first_ticker.last) # a `feed_is_live.set()` even though nothing is actually live
# SO, if the last quote price value is NaN we ONLY # Bp
# "pretend to do" `feed_is_live.set()` if it's a known if not venue_is_open:
# dst asset venue with a lot of closed operating hours. log.warning(
and mkt.dst.atype not in { f'Venue is closed, unable to establish real-time feed.\n'
'commodity', f'mkt: {mkt!r}\n'
'fiat', f'\n'
'crypto', f'first_ticker: {first_ticker}\n'
} )
):
task_status.started(( task_status.started((
init_msgs, init_msgs,
first_quote, first_quote,
@ -956,10 +1015,12 @@ async def stream_quotes(
feed_is_live.set() feed_is_live.set()
# block and let data history backfill code run. # block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever() await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed # ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this # to start and let it wait indefinitely..instead of this
# hard coded stuff. # hard coded stuff.
# async def wait_for_first_quote(): # async def wait_for_first_quote():
@ -981,23 +1042,26 @@ async def stream_quotes(
'Rxed init quote:\n' 'Rxed init quote:\n'
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )
cs: trio.CancelScope | None = None cs: trio.CancelScope|None = None
startup: bool = True startup: bool = True
while ( while (
startup startup
or cs.cancel_called or
cs.cancel_called
): ):
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse, trio.open_nursery() as nurse,
open_aio_quote_stream( open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream, ) as stream,
): ):
# ?TODO? can we rm this - particularly for `ib_async`?
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] # first_ticker.ticks = []
# only on first entry at feed boot up # only on first entry at feed boot up
if startup: if startup:
@ -1011,8 +1075,8 @@ async def stream_quotes(
# data feed event. # data feed event.
async def reset_on_feed(): async def reset_on_feed():
# TODO: this seems to be surpressed from the # ??TODO? this seems to be surpressed from the
# traceback in ``tractor``? # traceback in `tractor`?
# assert 0 # assert 0
rt_ev = proxy.status_event( rt_ev = proxy.status_event(
@ -1056,7 +1120,7 @@ async def stream_quotes(
# ugh, clear ticks since we've # ugh, clear ticks since we've
# consumed them (ahem, ib_insync is # consumed them (ahem, ib_insync is
# truly stateful trash) # truly stateful trash)
ticker.ticks = [] # ticker.ticks = []
# XXX: this works because we don't use # XXX: this works because we don't use
# ``aclosing()`` above? # ``aclosing()`` above?
@ -1073,8 +1137,12 @@ async def stream_quotes(
async for ticker in stream: async for ticker in stream:
quote = normalize(ticker) quote = normalize(ticker)
fqme = quote['fqme'] fqme = quote['fqme']
log.debug(
f'Sending quote\n'
f'{quote}'
)
await send_chan.send({fqme: quote}) await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] # ticker.ticks = []
# last = time.time() # last = time.time()

View File

@ -134,65 +134,65 @@ def pikerd(
Spawn the piker broker-daemon. Spawn the piker broker-daemon.
''' '''
from tractor.devx import maybe_open_crash_handler # from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb): # with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli') log = get_console_log(loglevel, name='cli')
if pdb: if pdb:
log.warning(( log.warning((
"\n" "\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the " "When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n" "task-thread until resumed from console!\n"
"\n" "\n"
))
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
)) ))
# service-actor registry endpoint socket-address set from .. import service
regaddrs: list[tuple[str, int]] = []
conf, _ = config.load( async def main():
conf_name='conf', service_mngr: service.Services
) async with (
network: dict = conf.get('network') service.open_pikerd(
if ( registry_addrs=regaddrs,
network is None loglevel=loglevel,
and not maddr debug_mode=pdb,
enable_transports=['uds'],
# enable_transports=['tcp'],
) as service_mngr,
): ):
regaddrs = [( assert service_mngr
_default_registry_host, # ?TODO? spawn all other sub-actor daemons according to
_default_registry_port, # multiaddress endpoint spec defined by user config
)] await trio.sleep_forever()
else: trio.run(main)
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from .. import service
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
enable_transports=['uds'],
) as service_mngr,
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
await trio.sleep_forever()
trio.run(main)
@click.group(context_settings=config._context_defaults) @click.group(context_settings=config._context_defaults)
@ -307,6 +307,10 @@ def services(config, tl, ports):
if not ports: if not ports:
ports = [_default_registry_port] ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services(): async def list_services():
nonlocal host nonlocal host
async with ( async with (
@ -315,15 +319,17 @@ def services(config, tl, ports):
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_registry( tractor.get_registry(
host=host, addr=addr,
port=ports[0]
) as portal ) as portal
): ):
registry = await portal.run_from_ns('self', 'get_registry') registry = await portal.run_from_ns(
'self',
'get_registry',
)
json_d = {} json_d = {}
for key, socket in registry.items(): for key, socket in registry.items():
host, port = socket json_d[key] = f'{socket}'
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}") click.echo(f"{colorize_json(json_d)}")
trio.run(list_services) trio.run(list_services)

View File

@ -107,17 +107,22 @@ async def open_piker_runtime(
async with ( async with (
tractor.open_root_actor( tractor.open_root_actor(
# passed through to ``open_root_actor`` # passed through to `open_root_actor`
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
name=name, name=name,
start_method=start_method,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
start_method=start_method,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
# TODO: eventually we should be able to avoid # TODO: eventually we should be able to avoid
# having the root have more then permissions to # having the root have more then permissions to
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -257,7 +262,10 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> (
tractor._portal.Portal
|ClassVar[Services]
):
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self
@ -282,10 +290,11 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = ( registry_addrs: list[tuple[str, int]] = (
registry_addrs registry_addrs
or [_default_reg_addr] or
[_default_reg_addr]
) )
pikerd_portal: tractor.Portal | None pikerd_portal: tractor.Portal|None
async with ( async with (
open_piker_runtime( open_piker_runtime(
name=query_name, name=query_name,

View File

@ -28,6 +28,7 @@ from contextlib import (
) )
import tractor import tractor
from trio.lowlevel import current_task
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -70,69 +71,84 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( try:
service_name, async with find_service(
registry_addrs=[('127.0.0.1', 6116)], service_name,
) as portal: registry_addrs=[('127.0.0.1', 6116)],
if portal is not None: ) as portal:
lock.release() if portal is not None:
yield portal lock.release()
return yield portal
return
log.warning( log.warning(
f"Couldn't find any existing {service_name}\n" f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..' 'Attempting to spawn new daemon-service..'
) )
# ask root ``pikerd`` daemon to spawn the daemon we need if # ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the # pikerd is not live we now become the root of the
# process tree # process tree
async with maybe_open_pikerd( async with maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
**pikerd_kwargs, **pikerd_kwargs,
) as pikerd_portal: ) as pikerd_portal:
# we are the root and thus are `pikerd` # we are the root and thus are `pikerd`
# so spawn the target service directly by calling # so spawn the target service directly by calling
# the provided target routine. # the provided target routine.
# XXX: this assumes that the target is well formed and will # XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call # do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level # the ``_Services`` api from above to start the top level
# service task for that actor. # service task for that actor.
started: bool started: bool
if pikerd_portal is None: if pikerd_portal is None:
started = await service_task_target( started = await service_task_target(
loglevel=loglevel, loglevel=loglevel,
**spawn_args, **spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
) )
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal raise err
await portal.cancel_actor()
async def spawn_emsd( async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context

View File

@ -101,13 +101,15 @@ async def open_registry(
if ( if (
not tractor.is_root_process() not tractor.is_root_process()
and not Registry.addrs and
not Registry.addrs
): ):
Registry.addrs.extend(actor.reg_addrs) Registry.addrs.extend(actor.reg_addrs)
if ( if (
ensure_exists ensure_exists
and not Registry.addrs and
not Registry.addrs
): ):
raise RuntimeError( raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?" f"`{uid}` registry should already exist but doesn't?"
@ -146,7 +148,7 @@ async def find_service(
| list[Portal] | list[Portal]
| None | None
): ):
# try:
reg_addrs: list[tuple[str, int]] reg_addrs: list[tuple[str, int]]
async with open_registry( async with open_registry(
addrs=( addrs=(
@ -157,22 +159,39 @@ async def find_service(
or Registry.addrs or Registry.addrs
), ),
) as reg_addrs: ) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None log.info(
f'Scanning for service {service_name!r}'
)
# attach to existing daemon by name if possible # attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor( async with tractor.find_actor(
service_name, service_name,
registry_addrs=reg_addrs, registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref only_first=first_only, # if set only returns single ref
) as maybe_portals: ) as maybe_portals:
if not maybe_portals: if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None yield None
return return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service( async def check_for_service(
service_name: str, service_name: str,

View File

@ -109,12 +109,13 @@ uis = [
dev = [ dev = [
"pytest >=6.0.0, <7.0.0", "pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0", "elasticsearch >=8.9.0, <9.0.0",
"xonsh >=0.14.2, <0.15.0", 'xonsh',
"prompt-toolkit ==3.0.40", "prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0", "cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0", "greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6", "ruff>=0.9.6",
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"i3ipc>=2.2.1",
] ]
[project.scripts] [project.scripts]
@ -145,4 +146,6 @@ tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
# XXX for @goodboy's hackin, usually there's something new in the # XXX for @goodboy's hackin, usually there's something new in the
# runtime being seriously tested here Bp # runtime being seriously tested here Bp
tractor = { path = "../tractor", editable = true } tractor = { git = "https://github.com/goodboy/tractor.git", branch ="final_eg_refinements" }
# tractor = { path = "../tractor/", editable = true }
# xonsh = { path = "../xonsh", editable = true }

72
uv.lock
View File

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.12" requires-python = ">=3.12"
[[package]] [[package]]
@ -492,6 +492,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/56/95/9377bcb415797e44274b51d46e3249eba641711cf3348050f76ee7b15ffc/httpx-0.27.2-py3-none-any.whl", hash = "sha256:7bb2708e112d8fdd7829cd4243970f0c223274051cb35ee80c03301ee29a3df0", size = 76395, upload-time = "2024-08-27T12:53:59.653Z" }, { url = "https://files.pythonhosted.org/packages/56/95/9377bcb415797e44274b51d46e3249eba641711cf3348050f76ee7b15ffc/httpx-0.27.2-py3-none-any.whl", hash = "sha256:7bb2708e112d8fdd7829cd4243970f0c223274051cb35ee80c03301ee29a3df0", size = 76395, upload-time = "2024-08-27T12:53:59.653Z" },
] ]
[[package]]
name = "i3ipc"
version = "2.2.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-xlib" },
]
sdist = { url = "https://files.pythonhosted.org/packages/61/f3/dfab70c888d85d3e933ff4d6b351aaed0ae137a29c896e03e364de3bec94/i3ipc-2.2.1.tar.gz", hash = "sha256:e880d7d7147959ead5cb34764f08b97b41385b36eb8256e8af1ce163dbcccce8", size = 47760, upload-time = "2020-04-05T17:25:08.666Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/de/30/294b07ddeccb58855c890c3ef3a951c3b0c1e2d089666d548b6a9edc39fb/i3ipc-2.2.1-py3-none-any.whl", hash = "sha256:c0b898223d50d42c90c818deb5033d1304c582755547dee7d15df3e3781bc690", size = 26591, upload-time = "2020-04-05T17:25:07.338Z" },
]
[[package]] [[package]]
name = "ib-insync" name = "ib-insync"
version = "0.9.86" version = "0.9.86"
@ -839,6 +851,7 @@ dev = [
{ name = "cython" }, { name = "cython" },
{ name = "elasticsearch" }, { name = "elasticsearch" },
{ name = "greenback" }, { name = "greenback" },
{ name = "i3ipc" },
{ name = "prompt-toolkit" }, { name = "prompt-toolkit" },
{ name = "pyperclip" }, { name = "pyperclip" },
{ name = "pytest" }, { name = "pytest" },
@ -875,7 +888,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" }, { name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" }, { name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" }, { name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", editable = "../tractor" }, { name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=final_eg_refinements" },
{ name = "trio", specifier = ">=0.27" }, { name = "trio", specifier = ">=0.27" },
{ name = "trio-typing", specifier = ">=0.10.0" }, { name = "trio-typing", specifier = ">=0.10.0" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" }, { name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
@ -890,11 +903,12 @@ dev = [
{ name = "cython", specifier = ">=3.0.0,<4.0.0" }, { name = "cython", specifier = ">=3.0.0,<4.0.0" },
{ name = "elasticsearch", specifier = ">=8.9.0,<9.0.0" }, { name = "elasticsearch", specifier = ">=8.9.0,<9.0.0" },
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" }, { name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "i3ipc", specifier = ">=2.2.1" },
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=6.0.0,<7.0.0" }, { name = "pytest", specifier = ">=6.0.0,<7.0.0" },
{ name = "ruff", specifier = ">=0.9.6" }, { name = "ruff", specifier = ">=0.9.6" },
{ name = "xonsh", specifier = ">=0.14.2,<0.15.0" }, { name = "xonsh" },
] ]
[[package]] [[package]]
@ -1196,6 +1210,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" },
] ]
[[package]]
name = "python-xlib"
version = "0.33"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "six" },
]
sdist = { url = "https://files.pythonhosted.org/packages/86/f5/8c0653e5bb54e0cbdfe27bf32d41f27bc4e12faa8742778c17f2a71be2c0/python-xlib-0.33.tar.gz", hash = "sha256:55af7906a2c75ce6cb280a584776080602444f75815a7aff4d287bb2d7018b32", size = 269068, upload-time = "2022-12-25T18:53:00.824Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fc/b8/ff33610932e0ee81ae7f1269c890f697d56ff74b9f5b2ee5d9b7fa2c5355/python_xlib-0.33-py2.py3-none-any.whl", hash = "sha256:c3534038d42e0df2f1392a1b30a15a4ff5fdc2b86cfa94f072bf11b10a164398", size = 182185, upload-time = "2022-12-25T18:52:58.662Z" },
]
[[package]] [[package]]
name = "pyyaml" name = "pyyaml"
version = "6.0.2" version = "6.0.2"
@ -1473,7 +1499,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]] [[package]]
name = "tractor" name = "tractor"
version = "0.1.0a6.dev0" version = "0.1.0a6.dev0"
source = { editable = "../tractor" } source = { git = "https://github.com/goodboy/tractor.git?branch=final_eg_refinements#5fc64107e566a5b59097cb1e9a6b3171f2125106" }
dependencies = [ dependencies = [
{ name = "bidict" }, { name = "bidict" },
{ name = "cffi" }, { name = "cffi" },
@ -1485,31 +1511,6 @@ dependencies = [
{ name = "wrapt" }, { name = "wrapt" },
] ]
[package.metadata]
requires-dist = [
{ name = "bidict", specifier = ">=0.23.1" },
{ name = "cffi", specifier = ">=1.17.1" },
{ name = "colorlog", specifier = ">=6.8.2,<7" },
{ name = "msgspec", specifier = ">=0.19.0" },
{ name = "pdbp", specifier = ">=1.6,<2" },
{ name = "tricycle", specifier = ">=0.4.1,<0.5" },
{ name = "trio", specifier = ">0.27" },
{ name = "wrapt", specifier = ">=1.16.0,<2" },
]
[package.metadata.requires-dev]
dev = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.13.2" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
[[package]] [[package]]
name = "tricycle" name = "tricycle"
version = "0.4.1" version = "0.4.1"
@ -1730,14 +1731,15 @@ wheels = [
[[package]] [[package]]
name = "xonsh" name = "xonsh"
version = "0.14.4" version = "0.19.9"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/44/d4/e3f8e6db5db554a6318690acdd5b93f973a625f8fd36008f826f042a910c/xonsh-0.14.4.tar.gz", hash = "sha256:7a20607f0914c9876f3500f0badc0414aa1b8640c85001ba3b9b3cfd6d890b39", size = 768294, upload-time = "2024-01-16T14:00:38.228Z" } sdist = { url = "https://files.pythonhosted.org/packages/ea/eb/8f544caca583c5f9f0ae7d852769fdb8ed5f63b67646a3c66a2d19357d56/xonsh-0.19.9.tar.gz", hash = "sha256:4cab4c4d7a98aab7477a296f12bc008beccf3d090c6944f0b3375d80a574c37d", size = 730490, upload-time = "2025-06-24T19:59:54.029Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/6e/b9/6f14654279917a8e2d6959a02d13e5625a17ec1359a35c624cd1c9f2fc60/xonsh-0.14.4-py310-none-any.whl", hash = "sha256:2627524483a2d251de2325366453e183f016e164cb62475f8291a8ebd3e8bdc0", size = 620059, upload-time = "2024-01-16T14:00:03.496Z" }, { url = "https://files.pythonhosted.org/packages/72/ce/429bfed55803d7469fe3d0d02c385bc61eddfa7ace539b4fc0060fd8282d/xonsh-0.19.9-py310-none-any.whl", hash = "sha256:8974981f04740b35a6f283aefe8e50b72b00e83cf4a871d6cc04500c0e3be2dd", size = 645111, upload-time = "2025-06-24T19:59:46.79Z" },
{ url = "https://files.pythonhosted.org/packages/bc/50/682cbdca319026d3bef7c606c64af3f372b6e8946100a8b7a22f1ee4dc18/xonsh-0.14.4-py311-none-any.whl", hash = "sha256:8423fe0a2a5e91e4fa316eff8f445cfa12f61f2437b84fd06aef97bdfd306ffe", size = 620072, upload-time = "2024-01-16T14:00:33.15Z" }, { url = "https://files.pythonhosted.org/packages/a0/68/464a07569e8f973339a83d95eda980b60db4dbbe2da8ee82874081f9432c/xonsh-0.19.9-py311-none-any.whl", hash = "sha256:3c872d6f2f109ad3345af4476d2541a71d6ccdbbf2404b2fb2b95755c54e73ab", size = 645114, upload-time = "2025-06-24T19:59:52.857Z" },
{ url = "https://files.pythonhosted.org/packages/7e/99/83d91d63b07c7bb9d06af89126bb86ec72c2a48e9755468cd4f645e2aadf/xonsh-0.14.4-py312-none-any.whl", hash = "sha256:16f16147fbbd3110d3cda5e6738010bb16221bfd8c41f9f04eae1bde0b90f467", size = 620162, upload-time = "2024-01-16T14:00:32.929Z" }, { url = "https://files.pythonhosted.org/packages/0b/b6/a10c0c11eb2ed9edef4081feb588b1e16d8f9c752a288150163700c876cd/xonsh-0.19.9-py312-none-any.whl", hash = "sha256:ce0b91b8b25da835c0ffdb3f4aa4cf412ec5481f59a33ce9230cfa5fcbb5b0c8", size = 645105, upload-time = "2025-06-24T19:59:49.775Z" },
{ url = "https://files.pythonhosted.org/packages/ed/07/ec49db07412121bbfb902f7eadb8d0f3785e22c97b2ae05d6b9267574674/xonsh-0.14.4-py39-none-any.whl", hash = "sha256:e85f5e21b72e807d9e77c341ef9e4e964b52c923498628d840f2a21c8820a357", size = 612518, upload-time = "2024-01-16T14:00:32.223Z" }, { url = "https://files.pythonhosted.org/packages/ca/8d/45f9bf28a54504cfca126ac13d7f04a564ac7c8559163f8ecbc73e87f0f2/xonsh-0.19.9-py313-none-any.whl", hash = "sha256:fa2b5f200413e92524b17da0bd6db08378e0ded12bbe81335bc94a03ea9fc670", size = 645640, upload-time = "2025-06-24T19:59:50.993Z" },
{ url = "https://files.pythonhosted.org/packages/54/2a/fd19683e948aaa9fbcd217056417f8b7fdf972ead1b5820cbfe4f28ef93b/xonsh-0.19.9-py39-none-any.whl", hash = "sha256:b136f62f444483de2a83a8005c3e5e4fb1005a1625a4e51a2d35f0a717a7ac6f", size = 637405, upload-time = "2025-06-24T19:59:49.409Z" },
] ]
[[package]] [[package]]