Augment `.ib.symbols` search with more logging
Refactor `open_symbol_search()` to use `partial()` for nursery task spawning and add detailed query->results logging via `ppfmt()`. Deats, - change `extend_results()` to accept `target` callable + `pattern` + `**kwargs` and invoke inside, instead of receiving a pre-called awaitable; use `partial()` to pass args. - add `ppfmt()` formatted logging of search query params and results including client class + method repr. - change `print()` -> `log.exception()` for `Lagged` overrun. - bump `upto=5` -> `upto=10` for `search_symbols()` call. Also for styling, - add type some missing type annots. - add multiline style to `or` conditionals in pattern check. - reformat log msgs to multiline style throughout. - use `ppfmt()` for fuzzy match debug log. - rename nursery `sn` -> `tn`. - add TODO comment about `assert 0` hang. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codeib_async
parent
89a145113c
commit
9010f9c7ab
|
|
@ -23,6 +23,7 @@ from contextlib import (
|
||||||
nullcontext,
|
nullcontext,
|
||||||
)
|
)
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
|
from functools import partial
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Awaitable,
|
Awaitable,
|
||||||
|
|
@ -32,6 +33,7 @@ from typing import (
|
||||||
from rapidfuzz import process as fuzzy
|
from rapidfuzz import process as fuzzy
|
||||||
import ib_async as ibis
|
import ib_async as ibis
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor.devx.pformat import ppfmt
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from piker.accounting import (
|
from piker.accounting import (
|
||||||
|
|
@ -215,18 +217,19 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
f'{ib_client}\n'
|
f'{ib_client}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
last = time.time()
|
last: float = time.time()
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
log.info(f'received {pattern}')
|
log.info(f'received {pattern}')
|
||||||
now: float = time.time()
|
now: float = time.time()
|
||||||
|
|
||||||
|
# TODO? check this is no longer true?
|
||||||
# this causes tractor hang...
|
# this causes tractor hang...
|
||||||
# assert 0
|
# assert 0
|
||||||
|
|
||||||
assert pattern, 'IB can not accept blank search pattern'
|
assert pattern, 'IB can not accept blank search pattern'
|
||||||
|
|
||||||
# throttle search requests to no faster then 1Hz
|
# throttle search requests to no faster then 1Hz
|
||||||
diff = now - last
|
diff: float = now - last
|
||||||
if diff < 1.0:
|
if diff < 1.0:
|
||||||
log.debug('throttle sleeping')
|
log.debug('throttle sleeping')
|
||||||
await trio.sleep(diff)
|
await trio.sleep(diff)
|
||||||
|
|
@ -237,11 +240,12 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not pattern
|
not pattern
|
||||||
or pattern.isspace()
|
or
|
||||||
|
pattern.isspace()
|
||||||
|
or
|
||||||
# XXX: not sure if this is a bad assumption but it
|
# XXX: not sure if this is a bad assumption but it
|
||||||
# seems to make search snappier?
|
# seems to make search snappier?
|
||||||
or len(pattern) < 1
|
len(pattern) < 1
|
||||||
):
|
):
|
||||||
log.warning('empty pattern received, skipping..')
|
log.warning('empty pattern received, skipping..')
|
||||||
|
|
||||||
|
|
@ -254,36 +258,58 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
# XXX: this unblocks the far end search task which may
|
# XXX: this unblocks the far end search task which may
|
||||||
# hold up a multi-search nursery block
|
# hold up a multi-search nursery block
|
||||||
await stream.send({})
|
await stream.send({})
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log.info(f'searching for {pattern}')
|
log.info(
|
||||||
|
f'Searching for FQME with,\n'
|
||||||
|
f'pattern: {pattern!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
last = time.time()
|
last: float = time.time()
|
||||||
|
|
||||||
# async batch search using api stocks endpoint and module
|
# async batch search using api stocks endpoint and
|
||||||
# defined adhoc symbol set.
|
# module defined adhoc symbol set.
|
||||||
stock_results = []
|
stock_results: list[dict] = []
|
||||||
|
|
||||||
async def extend_results(
|
async def extend_results(
|
||||||
target: Awaitable[list]
|
# ?TODO, how to type async-fn!?
|
||||||
|
target: Awaitable[list],
|
||||||
|
pattern: str,
|
||||||
|
**kwargs,
|
||||||
) -> None:
|
) -> None:
|
||||||
try:
|
try:
|
||||||
results = await target
|
results = await target(
|
||||||
|
pattern=pattern,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
client_repr: str = proxy._aio_ns.ib.client.__class__.__name__
|
||||||
|
meth_repr: str = target.keywords["meth"]
|
||||||
|
log.info(
|
||||||
|
f'Search query,\n'
|
||||||
|
f'{client_repr}.{meth_repr}(\n'
|
||||||
|
f' pattern={pattern!r}\n'
|
||||||
|
f' **kwargs={kwargs!r},\n'
|
||||||
|
f') = {ppfmt(list(results))}'
|
||||||
|
# XXX ^ just the keys since that's what
|
||||||
|
# shows in UI results table.
|
||||||
|
)
|
||||||
except tractor.trionics.Lagged:
|
except tractor.trionics.Lagged:
|
||||||
print("IB SYM-SEARCH OVERRUN?!?")
|
log.exception(
|
||||||
|
'IB SYM-SEARCH OVERRUN?!?\n'
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
stock_results.extend(results)
|
stock_results.extend(results)
|
||||||
|
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as sn:
|
async with trio.open_nursery() as tn:
|
||||||
sn.start_soon(
|
tn.start_soon(
|
||||||
|
partial(
|
||||||
extend_results,
|
extend_results,
|
||||||
proxy.search_symbols(
|
|
||||||
pattern=pattern,
|
pattern=pattern,
|
||||||
upto=5,
|
target=proxy.search_symbols,
|
||||||
|
upto=10,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -313,7 +339,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
# adhoc_match_results = {i[0]: {} for i in
|
# adhoc_match_results = {i[0]: {} for i in
|
||||||
# adhoc_matches}
|
# adhoc_matches}
|
||||||
|
|
||||||
log.debug(f'fuzzy matching stocks {stock_results}')
|
log.debug(
|
||||||
|
f'fuzzy matching stocks {ppfmt(stock_results)}'
|
||||||
|
)
|
||||||
stock_matches = fuzzy.extract(
|
stock_matches = fuzzy.extract(
|
||||||
pattern,
|
pattern,
|
||||||
stock_results,
|
stock_results,
|
||||||
|
|
@ -327,7 +355,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
# TODO: we used to deliver contract details
|
# TODO: we used to deliver contract details
|
||||||
# {item[2]: item[0] for item in stock_matches}
|
# {item[2]: item[0] for item in stock_matches}
|
||||||
|
|
||||||
log.debug(f"sending matches: {matches.keys()}")
|
log.debug(
|
||||||
|
f'Sending final matches\n'
|
||||||
|
f'{matches.keys()}'
|
||||||
|
)
|
||||||
await stream.send(matches)
|
await stream.send(matches)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue