Augment `.ib.symbols` search with more logging

Refactor `open_symbol_search()` to use `partial()` for nursery task
spawning and add detailed query->results logging via `ppfmt()`.

Deats,
- change `extend_results()` to accept `target` callable +
  `pattern` + `**kwargs` and invoke inside, instead of receiving
  a pre-called awaitable; use `partial()` to pass args.
- add `ppfmt()` formatted logging of search query params and
  results including client class + method repr.
- change `print()` -> `log.exception()` for `Lagged` overrun.
- bump `upto=5` -> `upto=10` for `search_symbols()` call.

Also for styling,
- add type some missing type annots.
- add multiline style to `or` conditionals in pattern check.
- reformat log msgs to multiline style throughout.
- use `ppfmt()` for fuzzy match debug log.
- rename nursery `sn` -> `tn`.
- add TODO comment about `assert 0` hang.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
ib_async_CONT
Gud Boi 2026-03-05 21:46:07 -05:00
parent 103967870e
commit e571138816
1 changed files with 52 additions and 21 deletions

View File

@ -23,6 +23,7 @@ from contextlib import (
nullcontext, nullcontext,
) )
from decimal import Decimal from decimal import Decimal
from functools import partial
import time import time
from typing import ( from typing import (
Awaitable, Awaitable,
@ -32,6 +33,7 @@ from typing import (
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import ib_async as ibis import ib_async as ibis
import tractor import tractor
from tractor.devx.pformat import ppfmt
import trio import trio
from piker.accounting import ( from piker.accounting import (
@ -215,18 +217,19 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'{ib_client}\n' f'{ib_client}\n'
) )
last = time.time() last: float = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.info(f'received {pattern}')
now: float = time.time() now: float = time.time()
# TODO? check this is no longer true?
# this causes tractor hang... # this causes tractor hang...
# assert 0 # assert 0
assert pattern, 'IB can not accept blank search pattern' assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz # throttle search requests to no faster then 1Hz
diff = now - last diff: float = now - last
if diff < 1.0: if diff < 1.0:
log.debug('throttle sleeping') log.debug('throttle sleeping')
await trio.sleep(diff) await trio.sleep(diff)
@ -237,11 +240,12 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
if ( if (
not pattern not pattern
or pattern.isspace() or
pattern.isspace()
or
# XXX: not sure if this is a bad assumption but it # XXX: not sure if this is a bad assumption but it
# seems to make search snappier? # seems to make search snappier?
or len(pattern) < 1 len(pattern) < 1
): ):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
@ -254,36 +258,58 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# XXX: this unblocks the far end search task which may # XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block # hold up a multi-search nursery block
await stream.send({}) await stream.send({})
continue continue
log.info(f'searching for {pattern}') log.info(
f'Searching for FQME with,\n'
f'pattern: {pattern!r}\n'
)
last = time.time() last: float = time.time()
# async batch search using api stocks endpoint and module # async batch search using api stocks endpoint and
# defined adhoc symbol set. # module defined adhoc symbol set.
stock_results = [] stock_results: list[dict] = []
async def extend_results( async def extend_results(
target: Awaitable[list] # ?TODO, how to type async-fn!?
target: Awaitable[list],
pattern: str,
**kwargs,
) -> None: ) -> None:
try: try:
results = await target results = await target(
pattern=pattern,
**kwargs,
)
client_repr: str = proxy._aio_ns.ib.client.__class__.__name__
meth_repr: str = target.keywords["meth"]
log.info(
f'Search query,\n'
f'{client_repr}.{meth_repr}(\n'
f' pattern={pattern!r}\n'
f' **kwargs={kwargs!r},\n'
f') = {ppfmt(list(results))}'
# XXX ^ just the keys since that's what
# shows in UI results table.
)
except tractor.trionics.Lagged: except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?") log.exception(
'IB SYM-SEARCH OVERRUN?!?\n'
)
return return
stock_results.extend(results) stock_results.extend(results)
for _ in range(10): for _ in range(10):
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as tn:
sn.start_soon( tn.start_soon(
partial(
extend_results, extend_results,
proxy.search_symbols(
pattern=pattern, pattern=pattern,
upto=5, target=proxy.search_symbols,
upto=10,
), ),
) )
@ -313,7 +339,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# adhoc_match_results = {i[0]: {} for i in # adhoc_match_results = {i[0]: {} for i in
# adhoc_matches} # adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}') log.debug(
f'fuzzy matching stocks {ppfmt(stock_results)}'
)
stock_matches = fuzzy.extract( stock_matches = fuzzy.extract(
pattern, pattern,
stock_results, stock_results,
@ -327,7 +355,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# TODO: we used to deliver contract details # TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches} # {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}") log.debug(
f'Sending final matches\n'
f'{matches.keys()}'
)
await stream.send(matches) await stream.send(matches)