Compare commits

...

6 Commits

Author SHA1 Message Date
Gud Boi 6402146116 Add `ls` caps for claudy 2026-03-24 00:23:28 -04:00
Gud Boi 5e44ad05e8 Swap `open_channel_from()` yield-pair order
Port deribit and IB `asyncio` bridge callables to the new
`to_asyncio.open_channel_from()` signature where the `LinkedTaskChannel`
is the first param and `started_nowait()` replaces the old
`to_trio.send_nowait()` sync handshake.

Deats,
- deribit `api.py`: update `aio_price_feed_relay()` and
  `aio_order_feed_relay()` signatures to take `chan: LinkedTaskChannel`
  as first arg; drop `from_trio`/`to_trio` params; replace
  `to_trio.send_nowait()` with `chan.send_nowait()` and
  `chan.started_nowait()`.
- drop `functools.partial()` wrapping in both `open_price_feed()` and
  `open_order_feed()`; pass `fh=`/`instrument=` as kwargs directly.
- IB `broker.py`: same `chan` + `started_nowait()` port for
  `recv_trade_updates()`.

Other styling,
- rewrap `recv_trade_updates()` docstring to 67 chars.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-24 00:10:55 -04:00
Gud Boi 799c9f45b4 Update `tractor` private-API refs across codebase
Port internal `tractor._<mod>` references to their
new public or reorganized paths after `tractor`
refactored its subpkg layout.

Deats,
- `tractor._portal.Portal` -> `tractor.Portal`.
- `tractor._supervise.ActorNursery` -> `tractor.ActorNursery`.
- `tractor._multiaddr` -> `tractor.discovery._multiaddr`.
- `tractor._addr` -> `tractor.discovery._addr`.
- `tractor._state._runtime_vars` -> `tractor.runtime._state._runtime_vars`.
- `tractor._state.is_debug_mode()` -> `tractor.runtime._state.is_debug_mode()`.

Files touched: `brokers/data.py`, `cli/__init__.py`, `data/feed.py`,
`service/_actor_runtime.py`, `service/_mngr.py`, `storage/cli.py`,
`tsp/_annotate.py`, `ui/kivy/monitor.py`, `ui/kivy/option_chain.py`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 23:55:07 -04:00
Gud Boi 3254f9233c Use pre-set reg addrs in `maybe_spawn_daemon()`
Pull actor-runtime `registry_addrs` from (the new)
`tractor.get_runtime_vars()` (over the previous hardcoding of
`('127.0.0.1', 6116)`..)) so that underlying `find_service()` and
`maybe_open_pikerd()` calls use the local actor's assigned registrar
endpoints.

Note, this is particularly necessary to get the `pytest` harness workin
again alongside any local running `pikerd` instance(s).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 23:05:12 -04:00
Gud Boi ced898f580 Add `MarketNotFound` exc and improve binance fqme error
Add a `MarketNotFound(SymbolNotFound)` subclass for
mkt-pair-specific lookup failures; use it in binance
`get_mkt_info()` with a detailed expected-form hint.

Deats,
- add `MarketNotFound` in `brokers/_util.py`.
- re-export from `brokers/__init__.py`.
- binance `feed.py`: swap `SymbolNotFound` import
  for `MarketNotFound`; build `expected` string
  showing the `<pair>.<venue>.<broker>` format
  and suggest `".spot."` if venue is missing.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 21:36:59 -04:00
Gud Boi 188ce1415e Update feed test fqmes to use `.<venue>` format
Switch all `test_feeds.py` parametrized fqmes from
legacy `<pair>.<broker>` to the current
`<pair>.<venue>.<broker>` schema (e.g.
`btcusdt.spot.binance`).

Deats,
- update binance fqmes: `btcusdt.binance` ->
  `btcusdt.spot.binance`, same for `ethusdt`.
- update kraken fqmes: `ethusdt.kraken` ->
  `ethusdt.spot.kraken`, `xbtusd.kraken` ->
  `xbtusd.spot.kraken`.
- update cross-broker set similarly.
- comment out old fqmes with `!TODO` to later
  validate raising on bad/legacy formats.

Also,
- reformat `if ci_env and not run_in_ci` to
  multiline style.
- reformat `pytest.skip()` msg to multiline.
- add `?TODO` for symbology helper fn.
- drop stray `await tractor.pause()` in
  `conftest.py`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-23 19:51:53 -04:00
18 changed files with 93 additions and 47 deletions

View File

@ -3,7 +3,8 @@
"allow": [
"Bash(chmod:*)",
"Bash(/tmp/piker_commits.txt)",
"Bash(python:*)"
"Bash(python:*)",
"Bash(ls:*)"
],
"deny": [],
"ask": []

View File

@ -31,6 +31,7 @@ from piker.log import (
from ._util import (
BrokerError,
SymbolNotFound,
MarketNotFound as MarketNotFound,
NoData,
DataUnavailable,
DataThrottle,

View File

@ -66,6 +66,10 @@ class SymbolNotFound(BrokerError):
"Symbol not found by broker search"
class MarketNotFound(SymbolNotFound):
"Mkt-pair not found by broker search"
# TODO: these should probably be moved to `.tsp/.data`?
class NoData(BrokerError):
'''

View File

@ -48,7 +48,7 @@ import tractor
from piker.brokers import (
open_cached_client,
NoData,
SymbolNotFound,
MarketNotFound,
)
from piker._cacheables import (
async_lifo_cache,
@ -325,9 +325,21 @@ async def get_mkt_info(
venue_lower: str = venue.lower()
if not venue:
raise SymbolNotFound(
if expiry:
expiry = f'.{expiry}'
expected: str = (
f'{mkt_ep}'
f'.<venue>'
f'{expiry}'
f'.{broker}'
)
raise MarketNotFound(
f'Invalid or missing .<venue> part in fqme?\n'
f'\n'
f'fqme: {fqme!r}\n'
f'expected-form>> {expected}\n'
f'\n'
f'Maybe you are missing a ".spot." ?\n'
)
# XXX TODO: we should change the usdtm_futes name to just

View File

@ -425,7 +425,7 @@ class DataFeed:
async def stream_to_file(
watchlist_name: str,
filename: str,
portal: tractor._portal.Portal,
portal: tractor.Portal,
tickers: List[str],
brokermod: ModuleType,
rate: int,

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from functools import partial
import time
from typing import (
Any,
@ -524,13 +523,12 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
chan.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
@ -540,7 +538,7 @@ async def aio_price_feed_relay(
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
chan.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
@ -570,7 +568,7 @@ async def aio_price_feed_relay(
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
chan.started_nowait(None)
await asyncio.sleep(float('inf'))
@ -581,11 +579,9 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
aio_price_feed_relay,
fh=fh,
instrument=instrument,
) as (chan, first):
yield chan
@ -611,10 +607,9 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
@ -637,7 +632,7 @@ async def aio_order_feed_relay(
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
chan.started_nowait(None)
await asyncio.sleep(float('inf'))
@ -648,11 +643,9 @@ async def open_order_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh,
instrument
)
aio_order_feed_relay,
fh=fh,
instrument=instrument,
) as (chan, first):
yield chan

View File

@ -231,20 +231,21 @@ async def handle_order_requests(
async def recv_trade_updates(
chan: tractor.to_asyncio.LinkedTaskChannel,
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Receive and relay order control and positioning related events
from `ib_async`, pack as tuples and push over mem-chan to our
trio relay task for processing and relay to EMS.
Receive and relay order control and positioning
related events from `ib_async`, pack as tuples and
push over mem-chan to our trio relay task for
processing and relay to EMS.
'''
client.inline_errors(to_trio)
client.inline_errors(chan)
# sync with trio task
to_trio.send_nowait(client.ib)
chan.started_nowait(client.ib)
def push_tradesies(
eventkit_obj,
@ -282,7 +283,7 @@ async def recv_trade_updates(
try:
# emit event name + relevant ibis internal objects
to_trio.send_nowait((event_name, emit))
chan.send_nowait((event_name, emit))
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)

View File

@ -27,7 +27,7 @@ from types import ModuleType
import click
import trio
import tractor
from tractor._multiaddr import parse_maddr
from tractor.discovery._multiaddr import parse_maddr
from ..log import (
get_console_log,
@ -345,7 +345,7 @@ def services(
if not ports:
ports: list[int] = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr = tractor.discovery._addr.wrap_address(
addr=(host, ports[0])
)

View File

@ -77,7 +77,7 @@ from ._sampling import (
if TYPE_CHECKING:
from .flows import Flume
from tractor._addr import Address
from tractor.discovery._addr import Address
from tractor.msg.types import Aid

View File

@ -91,7 +91,7 @@ async def open_piker_runtime(
try:
actor = tractor.current_actor()
except tractor._exceptions.NoRuntime:
tractor._state._runtime_vars[
tractor.runtime._state._runtime_vars[
'piker_vars'
] = tractor_runtime_overrides
@ -264,7 +264,7 @@ async def maybe_open_pikerd(
**kwargs,
) -> (
tractor._portal.Portal
tractor.Portal
|ClassVar[Services]
):
'''

View File

@ -79,10 +79,17 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
if not pikerd_kwargs:
# XXX NOTE, pin to apprope `tractor` branch!
rtvs: dict = tractor.get_runtime_vars()
registry_addrs: list[tuple] = list(
map(tuple, rtvs['_registry_addrs'])
)
try:
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
registry_addrs=registry_addrs,
) as portal:
if portal is not None:
lock.release()
@ -99,6 +106,7 @@ async def maybe_spawn_daemon(
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
registry_addrs=registry_addrs,
**pikerd_kwargs,
) as pikerd_portal:

View File

@ -48,7 +48,7 @@ log = get_logger(name=__name__)
# new actors and supervises them to completion?
class Services:
actor_n: tractor._supervise.ActorNursery
actor_n: tractor.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[

View File

@ -306,7 +306,7 @@ def ldshm(
# TODO: call null-seg fixer somehow?
if null_segs:
if tractor._state.is_debug_mode():
if tractor.runtime._state.is_debug_mode():
await tractor.pause()
# async with (
# trio.open_nursery() as tn,

View File

@ -196,7 +196,7 @@ async def markup_gaps(
prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row
if tractor._state.is_debug_mode():
if tractor.runtime._state.is_debug_mode():
await tractor.pause()
istart: int = prev_r['index'][0]

View File

@ -167,7 +167,7 @@ async def stream_symbol_selection():
async def _async_main(
name: str,
portal: tractor._portal.Portal,
portal: tractor.Portal,
symbols: List[str],
brokermod: ModuleType,
loglevel: str = 'info',

View File

@ -436,7 +436,7 @@ class OptionChain(object):
async def new_chain_ui(
portal: tractor._portal.Portal,
portal: tractor.Portal,
symbol: str,
brokermod: types.ModuleType,
rate: int = 1,

View File

@ -137,7 +137,6 @@ async def _open_test_pikerd(
raddr = portal.chan.raddr
uw_raddr: tuple = raddr.unwrap()
assert uw_raddr == reg_addr
await tractor.pause()
yield (
raddr._host,
raddr._port,

View File

@ -23,13 +23,35 @@ from piker.accounting import (
'fqmes',
[
# binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
(100, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
'btcusdt.spot.binance',
'ethusdt.spot.binance',
}, False),
# kraken
(20, {'ethusdt.kraken', 'xbtusd.kraken'}, True),
(20, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'ethusdt.kraken',
# 'xbtusd.kraken',
'ethusdt.spot.kraken',
'xbtusd.spot.kraken',
}, True),
# binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
(100, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
# 'xbtusd.kraken',
'btcusdt.spot.binance',
'xbtusd.spot.kraken',
}, False),
],
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
)
@ -48,12 +70,17 @@ def test_multi_fqsn_feed(
if (
ci_env
and not run_in_ci
and
not run_in_ci
):
pytest.skip('Skipping CI disabled test due to feed restrictions')
pytest.skip(
'CI-disabled-test due to live-feed restrictions'
)
brokers = set()
for fqme in fqmes:
# ?TODO, add this unpack + normalize check to a symbology
# helper fn?
brokername, *_ = unpack_fqme(fqme)
brokers.add(brokername)