Compare commits

...

6 Commits

Author SHA1 Message Date
Gud Boi 6402146116 Add `ls` caps for claudy 2026-03-24 00:23:28 -04:00
Gud Boi 5e44ad05e8 Swap `open_channel_from()` yield-pair order
Port deribit and IB `asyncio` bridge callables to the new
`to_asyncio.open_channel_from()` signature where the `LinkedTaskChannel`
is the first param and `started_nowait()` replaces the old
`to_trio.send_nowait()` sync handshake.

Deats,
- deribit `api.py`: update `aio_price_feed_relay()` and
  `aio_order_feed_relay()` signatures to take `chan: LinkedTaskChannel`
  as first arg; drop `from_trio`/`to_trio` params; replace
  `to_trio.send_nowait()` with `chan.send_nowait()` and
  `chan.started_nowait()`.
- drop `functools.partial()` wrapping in both `open_price_feed()` and
  `open_order_feed()`; pass `fh=`/`instrument=` as kwargs directly.
- IB `broker.py`: same `chan` + `started_nowait()` port for
  `recv_trade_updates()`.

Other styling,
- rewrap `recv_trade_updates()` docstring to 67 chars.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-24 00:10:55 -04:00
Gud Boi 799c9f45b4 Update `tractor` private-API refs across codebase
Port internal `tractor._<mod>` references to their
new public or reorganized paths after `tractor`
refactored its subpkg layout.

Deats,
- `tractor._portal.Portal` -> `tractor.Portal`.
- `tractor._supervise.ActorNursery` -> `tractor.ActorNursery`.
- `tractor._multiaddr` -> `tractor.discovery._multiaddr`.
- `tractor._addr` -> `tractor.discovery._addr`.
- `tractor._state._runtime_vars` -> `tractor.runtime._state._runtime_vars`.
- `tractor._state.is_debug_mode()` -> `tractor.runtime._state.is_debug_mode()`.

Files touched: `brokers/data.py`, `cli/__init__.py`, `data/feed.py`,
`service/_actor_runtime.py`, `service/_mngr.py`, `storage/cli.py`,
`tsp/_annotate.py`, `ui/kivy/monitor.py`, `ui/kivy/option_chain.py`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 23:55:07 -04:00
Gud Boi 3254f9233c Use pre-set reg addrs in `maybe_spawn_daemon()`
Pull actor-runtime `registry_addrs` from (the new)
`tractor.get_runtime_vars()` (over the previous hardcoding of
`('127.0.0.1', 6116)`..)) so that underlying `find_service()` and
`maybe_open_pikerd()` calls use the local actor's assigned registrar
endpoints.

Note, this is particularly necessary to get the `pytest` harness workin
again alongside any local running `pikerd` instance(s).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 23:05:12 -04:00
Gud Boi ced898f580 Add `MarketNotFound` exc and improve binance fqme error
Add a `MarketNotFound(SymbolNotFound)` subclass for
mkt-pair-specific lookup failures; use it in binance
`get_mkt_info()` with a detailed expected-form hint.

Deats,
- add `MarketNotFound` in `brokers/_util.py`.
- re-export from `brokers/__init__.py`.
- binance `feed.py`: swap `SymbolNotFound` import
  for `MarketNotFound`; build `expected` string
  showing the `<pair>.<venue>.<broker>` format
  and suggest `".spot."` if venue is missing.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 21:36:59 -04:00
Gud Boi 188ce1415e Update feed test fqmes to use `.<venue>` format
Switch all `test_feeds.py` parametrized fqmes from
legacy `<pair>.<broker>` to the current
`<pair>.<venue>.<broker>` schema (e.g.
`btcusdt.spot.binance`).

Deats,
- update binance fqmes: `btcusdt.binance` ->
  `btcusdt.spot.binance`, same for `ethusdt`.
- update kraken fqmes: `ethusdt.kraken` ->
  `ethusdt.spot.kraken`, `xbtusd.kraken` ->
  `xbtusd.spot.kraken`.
- update cross-broker set similarly.
- comment out old fqmes with `!TODO` to later
  validate raising on bad/legacy formats.

Also,
- reformat `if ci_env and not run_in_ci` to
  multiline style.
- reformat `pytest.skip()` msg to multiline.
- add `?TODO` for symbology helper fn.
- drop stray `await tractor.pause()` in
  `conftest.py`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 19:51:53 -04:00
18 changed files with 93 additions and 47 deletions

View File

@ -3,7 +3,8 @@
"allow": [ "allow": [
"Bash(chmod:*)", "Bash(chmod:*)",
"Bash(/tmp/piker_commits.txt)", "Bash(/tmp/piker_commits.txt)",
"Bash(python:*)" "Bash(python:*)",
"Bash(ls:*)"
], ],
"deny": [], "deny": [],
"ask": [] "ask": []

View File

@ -31,6 +31,7 @@ from piker.log import (
from ._util import ( from ._util import (
BrokerError, BrokerError,
SymbolNotFound, SymbolNotFound,
MarketNotFound as MarketNotFound,
NoData, NoData,
DataUnavailable, DataUnavailable,
DataThrottle, DataThrottle,

View File

@ -66,6 +66,10 @@ class SymbolNotFound(BrokerError):
"Symbol not found by broker search" "Symbol not found by broker search"
class MarketNotFound(SymbolNotFound):
"Mkt-pair not found by broker search"
# TODO: these should probably be moved to `.tsp/.data`? # TODO: these should probably be moved to `.tsp/.data`?
class NoData(BrokerError): class NoData(BrokerError):
''' '''

View File

@ -48,7 +48,7 @@ import tractor
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
SymbolNotFound, MarketNotFound,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
@ -325,9 +325,21 @@ async def get_mkt_info(
venue_lower: str = venue.lower() venue_lower: str = venue.lower()
if not venue: if not venue:
raise SymbolNotFound( if expiry:
expiry = f'.{expiry}'
expected: str = (
f'{mkt_ep}'
f'.<venue>'
f'{expiry}'
f'.{broker}'
)
raise MarketNotFound(
f'Invalid or missing .<venue> part in fqme?\n' f'Invalid or missing .<venue> part in fqme?\n'
f'\n'
f'fqme: {fqme!r}\n' f'fqme: {fqme!r}\n'
f'expected-form>> {expected}\n'
f'\n'
f'Maybe you are missing a ".spot." ?\n'
) )
# XXX TODO: we should change the usdtm_futes name to just # XXX TODO: we should change the usdtm_futes name to just

View File

@ -425,7 +425,7 @@ class DataFeed:
async def stream_to_file( async def stream_to_file(
watchlist_name: str, watchlist_name: str,
filename: str, filename: str,
portal: tractor._portal.Portal, portal: tractor.Portal,
tickers: List[str], tickers: List[str],
brokermod: ModuleType, brokermod: ModuleType,
rate: int, rate: int,

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from datetime import datetime from datetime import datetime
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -524,13 +523,12 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay( async def aio_price_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade(data: dict, receipt_timestamp): async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', { chan.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'last': data, 'last': data,
@ -540,7 +538,7 @@ async def aio_price_feed_relay(
})) }))
async def _l1(data: dict, receipt_timestamp): async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', { chan.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'ticks': [ 'ticks': [
@ -570,7 +568,7 @@ async def aio_price_feed_relay(
install_signal_handlers=False) install_signal_handlers=False)
# sync with trio # sync with trio
to_trio.send_nowait(None) chan.started_nowait(None)
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -581,11 +579,9 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
partial( aio_price_feed_relay,
aio_price_feed_relay, fh=fh,
fh, instrument=instrument,
instrument
)
) as (chan, first): ) as (chan, first):
yield chan yield chan
@ -611,10 +607,9 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay( async def aio_order_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _fill(data: dict, receipt_timestamp): async def _fill(data: dict, receipt_timestamp):
breakpoint() breakpoint()
@ -637,7 +632,7 @@ async def aio_order_feed_relay(
install_signal_handlers=False) install_signal_handlers=False)
# sync with trio # sync with trio
to_trio.send_nowait(None) chan.started_nowait(None)
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -648,11 +643,9 @@ async def open_order_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
partial( aio_order_feed_relay,
aio_order_feed_relay, fh=fh,
fh, instrument=instrument,
instrument
)
) as (chan, first): ) as (chan, first):
yield chan yield chan

View File

@ -231,20 +231,21 @@ async def handle_order_requests(
async def recv_trade_updates( async def recv_trade_updates(
chan: tractor.to_asyncio.LinkedTaskChannel,
client: Client, client: Client,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
''' '''
Receive and relay order control and positioning related events Receive and relay order control and positioning
from `ib_async`, pack as tuples and push over mem-chan to our related events from `ib_async`, pack as tuples and
trio relay task for processing and relay to EMS. push over mem-chan to our trio relay task for
processing and relay to EMS.
''' '''
client.inline_errors(to_trio) client.inline_errors(chan)
# sync with trio task # sync with trio task
to_trio.send_nowait(client.ib) chan.started_nowait(client.ib)
def push_tradesies( def push_tradesies(
eventkit_obj, eventkit_obj,
@ -282,7 +283,7 @@ async def recv_trade_updates(
try: try:
# emit event name + relevant ibis internal objects # emit event name + relevant ibis internal objects
to_trio.send_nowait((event_name, emit)) chan.send_nowait((event_name, emit))
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates') log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies) eventkit_obj.disconnect(push_tradesies)

View File

@ -27,7 +27,7 @@ from types import ModuleType
import click import click
import trio import trio
import tractor import tractor
from tractor._multiaddr import parse_maddr from tractor.discovery._multiaddr import parse_maddr
from ..log import ( from ..log import (
get_console_log, get_console_log,
@ -345,7 +345,7 @@ def services(
if not ports: if not ports:
ports: list[int] = [_default_registry_port] ports: list[int] = [_default_registry_port]
addr = tractor._addr.wrap_address( addr = tractor.discovery._addr.wrap_address(
addr=(host, ports[0]) addr=(host, ports[0])
) )

View File

@ -77,7 +77,7 @@ from ._sampling import (
if TYPE_CHECKING: if TYPE_CHECKING:
from .flows import Flume from .flows import Flume
from tractor._addr import Address from tractor.discovery._addr import Address
from tractor.msg.types import Aid from tractor.msg.types import Aid

View File

@ -91,7 +91,7 @@ async def open_piker_runtime(
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
except tractor._exceptions.NoRuntime: except tractor._exceptions.NoRuntime:
tractor._state._runtime_vars[ tractor.runtime._state._runtime_vars[
'piker_vars' 'piker_vars'
] = tractor_runtime_overrides ] = tractor_runtime_overrides
@ -264,7 +264,7 @@ async def maybe_open_pikerd(
**kwargs, **kwargs,
) -> ( ) -> (
tractor._portal.Portal tractor.Portal
|ClassVar[Services] |ClassVar[Services]
): ):
''' '''

View File

@ -79,10 +79,17 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
if not pikerd_kwargs:
# XXX NOTE, pin to apprope `tractor` branch!
rtvs: dict = tractor.get_runtime_vars()
registry_addrs: list[tuple] = list(
map(tuple, rtvs['_registry_addrs'])
)
try: try:
async with find_service( async with find_service(
service_name, service_name,
registry_addrs=[('127.0.0.1', 6116)], registry_addrs=registry_addrs,
) as portal: ) as portal:
if portal is not None: if portal is not None:
lock.release() lock.release()
@ -99,6 +106,7 @@ async def maybe_spawn_daemon(
# process tree # process tree
async with maybe_open_pikerd( async with maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
registry_addrs=registry_addrs,
**pikerd_kwargs, **pikerd_kwargs,
) as pikerd_portal: ) as pikerd_portal:

View File

@ -48,7 +48,7 @@ log = get_logger(name=__name__)
# new actors and supervises them to completion? # new actors and supervises them to completion?
class Services: class Services:
actor_n: tractor._supervise.ActorNursery actor_n: tractor.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[

View File

@ -306,7 +306,7 @@ def ldshm(
# TODO: call null-seg fixer somehow? # TODO: call null-seg fixer somehow?
if null_segs: if null_segs:
if tractor._state.is_debug_mode(): if tractor.runtime._state.is_debug_mode():
await tractor.pause() await tractor.pause()
# async with ( # async with (
# trio.open_nursery() as tn, # trio.open_nursery() as tn,

View File

@ -196,7 +196,7 @@ async def markup_gaps(
prev_r: pl.DataFrame = prev_row_by_i prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row # debug any missing pre-row
if tractor._state.is_debug_mode(): if tractor.runtime._state.is_debug_mode():
await tractor.pause() await tractor.pause()
istart: int = prev_r['index'][0] istart: int = prev_r['index'][0]

View File

@ -167,7 +167,7 @@ async def stream_symbol_selection():
async def _async_main( async def _async_main(
name: str, name: str,
portal: tractor._portal.Portal, portal: tractor.Portal,
symbols: List[str], symbols: List[str],
brokermod: ModuleType, brokermod: ModuleType,
loglevel: str = 'info', loglevel: str = 'info',

View File

@ -436,7 +436,7 @@ class OptionChain(object):
async def new_chain_ui( async def new_chain_ui(
portal: tractor._portal.Portal, portal: tractor.Portal,
symbol: str, symbol: str,
brokermod: types.ModuleType, brokermod: types.ModuleType,
rate: int = 1, rate: int = 1,

View File

@ -137,7 +137,6 @@ async def _open_test_pikerd(
raddr = portal.chan.raddr raddr = portal.chan.raddr
uw_raddr: tuple = raddr.unwrap() uw_raddr: tuple = raddr.unwrap()
assert uw_raddr == reg_addr assert uw_raddr == reg_addr
await tractor.pause()
yield ( yield (
raddr._host, raddr._host,
raddr._port, raddr._port,

View File

@ -23,13 +23,35 @@ from piker.accounting import (
'fqmes', 'fqmes',
[ [
# binance # binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False), (100, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
'btcusdt.spot.binance',
'ethusdt.spot.binance',
}, False),
# kraken # kraken
(20, {'ethusdt.kraken', 'xbtusd.kraken'}, True), (20, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'ethusdt.kraken',
# 'xbtusd.kraken',
'ethusdt.spot.kraken',
'xbtusd.spot.kraken',
}, True),
# binance + kraken # binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False), (100, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
# 'xbtusd.kraken',
'btcusdt.spot.binance',
'xbtusd.spot.kraken',
}, False),
], ],
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}', ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
) )
@ -48,12 +70,17 @@ def test_multi_fqsn_feed(
if ( if (
ci_env ci_env
and not run_in_ci and
not run_in_ci
): ):
pytest.skip('Skipping CI disabled test due to feed restrictions') pytest.skip(
'CI-disabled-test due to live-feed restrictions'
)
brokers = set() brokers = set()
for fqme in fqmes: for fqme in fqmes:
# ?TODO, add this unpack + normalize check to a symbology
# helper fn?
brokername, *_ = unpack_fqme(fqme) brokername, *_ = unpack_fqme(fqme)
brokers.add(brokername) brokers.add(brokername)