Compare commits
5 Commits
a3d46f713e
...
e1d57e8a8a
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | e1d57e8a8a | |
Tyler Goodlet | ad458e3fcd | |
Tyler Goodlet | f26c399ad3 | |
Tyler Goodlet | a741ed3161 | |
Tyler Goodlet | b9fbbeb44e |
|
@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from contextlib import ExitStack
|
||||
from dataclasses import asdict
|
||||
from functools import partial
|
||||
from pprint import pformat
|
||||
|
@ -35,8 +36,8 @@ from trio_typing import TaskStatus
|
|||
import tractor
|
||||
from ib_insync.contract import (
|
||||
Contract,
|
||||
Option,
|
||||
Forex,
|
||||
# Option,
|
||||
# Forex,
|
||||
)
|
||||
from ib_insync.order import (
|
||||
Trade,
|
||||
|
@ -47,11 +48,17 @@ from ib_insync.objects import (
|
|||
Execution,
|
||||
CommissionReport,
|
||||
)
|
||||
from ib_insync.objects import Position
|
||||
from ib_insync.objects import Position as IbPosition
|
||||
import pendulum
|
||||
|
||||
from piker import config
|
||||
from piker import pp
|
||||
from piker.pp import (
|
||||
Position,
|
||||
Transaction,
|
||||
open_trade_ledger,
|
||||
open_pps,
|
||||
PpTable,
|
||||
)
|
||||
from piker.log import get_console_log
|
||||
from piker.clearing._messages import (
|
||||
BrokerdOrder,
|
||||
|
@ -66,7 +73,8 @@ from piker.data._source import Symbol
|
|||
from .api import (
|
||||
_accounts2clients,
|
||||
# _adhoc_futes_set,
|
||||
_adhoc_symbol_map,
|
||||
con2fqsn,
|
||||
# _adhoc_symbol_map,
|
||||
log,
|
||||
get_config,
|
||||
open_client_proxies,
|
||||
|
@ -76,49 +84,12 @@ from .api import (
|
|||
|
||||
|
||||
def pack_position(
|
||||
pos: Position
|
||||
pos: IbPosition
|
||||
|
||||
) -> dict[str, Any]:
|
||||
|
||||
con = pos.contract
|
||||
|
||||
if isinstance(con, Option):
|
||||
# TODO: option symbol parsing and sane display:
|
||||
symbol = con.localSymbol.replace(' ', '')
|
||||
|
||||
else:
|
||||
# TODO: lookup fqsn even for derivs.
|
||||
symbol = con.symbol.lower()
|
||||
|
||||
# TODO: probably write a mofo exchange mapper routine since ib
|
||||
# can't get it's shit together like, ever.
|
||||
|
||||
# try our best to figure out the exchange / venue
|
||||
exch = (con.primaryExchange or con.exchange).lower()
|
||||
if not exch:
|
||||
|
||||
if isinstance(con, Forex):
|
||||
# bc apparently it's not in the contract obj?
|
||||
exch = 'idealfx'
|
||||
|
||||
else:
|
||||
# for wtv cucked reason some futes don't show their
|
||||
# exchange (like CL.NYMEX) ...
|
||||
entry = _adhoc_symbol_map.get(
|
||||
con.symbol or con.localSymbol
|
||||
)
|
||||
if entry:
|
||||
meta, kwargs = entry
|
||||
cid = meta.get('conId')
|
||||
if cid:
|
||||
assert con.conId == meta['conId']
|
||||
exch = meta['exchange']
|
||||
|
||||
assert exch, f'No clue:\n {con}'
|
||||
fqsn = '.'.join((symbol, exch))
|
||||
|
||||
expiry = con.lastTradeDateOrContractMonth
|
||||
if expiry:
|
||||
fqsn += f'.{expiry}'
|
||||
fqsn, calc_price = con2fqsn(con)
|
||||
|
||||
# TODO: options contracts into a sane format..
|
||||
return (
|
||||
|
@ -305,12 +276,10 @@ async def update_ledger_from_api_trades(
|
|||
client: Union[Client, MethodProxy],
|
||||
|
||||
) -> tuple[
|
||||
dict[str, pp.Transaction],
|
||||
dict[str, Transaction],
|
||||
dict[str, dict],
|
||||
]:
|
||||
|
||||
conf = get_config()
|
||||
|
||||
# XXX; ERRGGG..
|
||||
# pack in the "primary/listing exchange" value from a
|
||||
# contract lookup since it seems this isn't available by
|
||||
|
@ -331,39 +300,33 @@ async def update_ledger_from_api_trades(
|
|||
|
||||
entry['listingExchange'] = pexch
|
||||
|
||||
conf = get_config()
|
||||
entries = trades_to_ledger_entries(
|
||||
conf['accounts'].inverse,
|
||||
trade_entries,
|
||||
)
|
||||
|
||||
# write recent session's trades to the user's (local) ledger file.
|
||||
records: dict[str, pp.Transactions] = {}
|
||||
# normalize recent session's trades to the `Transaction` type
|
||||
trans_by_acct: dict[str, dict[str, Transaction]] = {}
|
||||
|
||||
for acctid, trades_by_id in entries.items():
|
||||
# normalize to transaction form
|
||||
records[acctid] = norm_trade_records(trades_by_id)
|
||||
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
|
||||
|
||||
return records, entries
|
||||
return trans_by_acct, entries
|
||||
|
||||
|
||||
async def update_and_audit_msgs(
|
||||
acctid: str, # no `ib.` prefix is required!
|
||||
pps: list[pp.Position],
|
||||
pps: list[Position],
|
||||
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
||||
validate: bool = False,
|
||||
|
||||
) -> list[BrokerdPosition]:
|
||||
|
||||
msgs: list[BrokerdPosition] = []
|
||||
# pps: dict[int, pp.Position] = {}
|
||||
|
||||
for p in pps:
|
||||
bsuid = p.bsuid
|
||||
|
||||
# build trade-session-actor local table
|
||||
# of pps from unique symbol ids.
|
||||
# pps[bsuid] = p
|
||||
|
||||
# retreive equivalent ib reported position message
|
||||
# for comparison/audit versus the piker equivalent
|
||||
# breakeven pp calcs.
|
||||
|
@ -462,6 +425,8 @@ async def trades_dialogue(
|
|||
all_positions = []
|
||||
accounts = set()
|
||||
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
|
||||
acctids = set()
|
||||
cids2pps: dict[str, BrokerdPosition] = {}
|
||||
|
||||
# TODO: this causes a massive tractor bug when you run marketstored
|
||||
# with ``--tsdb``... you should get:
|
||||
|
@ -471,15 +436,38 @@ async def trades_dialogue(
|
|||
# - hitting final control-c to kill daemon will lead to hang
|
||||
# assert 0
|
||||
|
||||
# TODO: just write on teardown?
|
||||
# we might also want to delegate a specific actor for
|
||||
# ledger writing / reading for speed?
|
||||
async with (
|
||||
trio.open_nursery() as nurse,
|
||||
open_client_proxies() as (proxies, aioclients),
|
||||
):
|
||||
# Open a trade ledgers stack for appending trade records over
|
||||
# multiple accounts.
|
||||
# TODO: we probably want to generalize this into a "ledgers" api..
|
||||
ledgers: dict[str, dict] = {}
|
||||
tables: dict[str, PpTable] = {}
|
||||
with (
|
||||
ExitStack() as lstack,
|
||||
):
|
||||
for account, proxy in proxies.items():
|
||||
|
||||
acctid = account.strip('ib.')
|
||||
acctids.add(acctid)
|
||||
|
||||
# open ledger and pptable wrapper for each
|
||||
# detected account.
|
||||
ledger = ledgers[acctid] = lstack.enter_context(
|
||||
open_trade_ledger('ib', acctid)
|
||||
)
|
||||
table = tables[acctid] = lstack.enter_context(
|
||||
open_pps('ib', acctid)
|
||||
)
|
||||
|
||||
client = aioclients[account]
|
||||
|
||||
async def open_stream(
|
||||
async def open_trade_event_stream(
|
||||
task_status: TaskStatus[
|
||||
trio.abc.ReceiveChannel
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
@ -493,75 +481,77 @@ async def trades_dialogue(
|
|||
task_status.started(trade_event_stream)
|
||||
await trio.sleep_forever()
|
||||
|
||||
trade_event_stream = await nurse.start(open_stream)
|
||||
|
||||
trade_event_stream = await nurse.start(open_trade_event_stream)
|
||||
clients.append((client, trade_event_stream))
|
||||
|
||||
assert account in accounts_def
|
||||
accounts.add(account)
|
||||
|
||||
cids2pps: dict[str, BrokerdPosition] = {}
|
||||
update_records: dict[str, bidict] = {}
|
||||
# update trades ledgers for all accounts from connected
|
||||
# api clients which report trades for **this session**.
|
||||
trades = await proxy.trades()
|
||||
(
|
||||
trans_by_acct,
|
||||
api_ready_for_ledger_entries,
|
||||
) = await update_ledger_from_api_trades(
|
||||
trades,
|
||||
proxy,
|
||||
)
|
||||
|
||||
# if new trades are detected from the API, prepare
|
||||
# them for the ledger file and update the pptable.
|
||||
if api_ready_for_ledger_entries:
|
||||
trade_entries = api_ready_for_ledger_entries[acctid]
|
||||
ledger.update(trade_entries)
|
||||
trans = trans_by_acct.get(acctid)
|
||||
if trans:
|
||||
table.update_from_trans(trans)
|
||||
|
||||
# process pp value reported from ib's system. we only use these
|
||||
# to cross-check sizing since average pricing on their end uses
|
||||
# the so called (bs) "FIFO" style which more or less results in
|
||||
# a price that's not useful for traders who want to not lose
|
||||
# money.. xb
|
||||
for client in aioclients.values():
|
||||
# for client in aioclients.values():
|
||||
for pos in client.positions():
|
||||
|
||||
cid, msg = pack_position(pos)
|
||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||
acctid = acctid.strip('ib.')
|
||||
cids2pps[(acctid, cid)] = msg
|
||||
assert msg.account in accounts, (
|
||||
f'Position for unknown account: {msg.account}')
|
||||
|
||||
# collect all ib-pp reported positions so that we can be
|
||||
# sure know which positions to update from the ledger if
|
||||
# any are missing from the ``pps.toml``
|
||||
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
|
||||
bsuid, msg = pack_position(pos)
|
||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||
acctid = acctid.strip('ib.')
|
||||
cids2pps[(acctid, bsuid)] = msg
|
||||
assert msg.account in accounts, (
|
||||
f'Position for unknown account: {msg.account}')
|
||||
|
||||
# update trades ledgers for all accounts from
|
||||
# connected api clients which report trades for **this session**.
|
||||
new_trades = {}
|
||||
for account, proxy in proxies.items():
|
||||
trades = await proxy.trades()
|
||||
(
|
||||
records_by_acct,
|
||||
ledger_entries,
|
||||
) = await update_ledger_from_api_trades(
|
||||
trades,
|
||||
proxy,
|
||||
)
|
||||
new_trades.update(records_by_acct)
|
||||
table = tables[acctid]
|
||||
pp = table.pps.get(bsuid)
|
||||
if (
|
||||
not pp
|
||||
or pp.size != msg.size
|
||||
):
|
||||
trans = norm_trade_records(ledger)
|
||||
updated = table.update_from_trans(trans)
|
||||
pp = updated[bsuid]
|
||||
assert msg.size == pp.size, 'WTF'
|
||||
|
||||
for acctid, trans in new_trades.items():
|
||||
for t in trans:
|
||||
bsuid = t.bsuid
|
||||
if bsuid in update_records:
|
||||
assert update_records[bsuid] == t.fqsn
|
||||
else:
|
||||
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
|
||||
# TODO: figure out why these don't match?
|
||||
# assert pp.calc_be_price() == pp.be_price
|
||||
|
||||
# load all positions from `pps.toml`, cross check with ib's
|
||||
# positions data, and relay re-formatted pps as msgs to the ems.
|
||||
_, closed_pps = table.dump_active('ib')
|
||||
active_pps = table.pps
|
||||
|
||||
# load all positions from `pps.toml`, cross check with
|
||||
# ib's positions data, and relay re-formatted pps as
|
||||
# msgs to the ems.
|
||||
# __2 cases__:
|
||||
# - new trades have taken place this session that we want to
|
||||
# always reprocess indempotently,
|
||||
# - no new trades yet but we want to reload and audit any
|
||||
# positions reported by ib's sys that may not yet be in
|
||||
# piker's ``pps.toml`` state-file.
|
||||
for acctid, to_update in update_records.items():
|
||||
trans = new_trades.get(acctid)
|
||||
active, closed = pp.update_pps_conf(
|
||||
'ib',
|
||||
acctid,
|
||||
trade_records=trans,
|
||||
ledger_reload=to_update,
|
||||
)
|
||||
for pps in [active, closed]:
|
||||
for pps in [active_pps, closed_pps]:
|
||||
msgs = await update_and_audit_msgs(
|
||||
acctid,
|
||||
pps.values(),
|
||||
|
@ -576,32 +566,22 @@ async def trades_dialogue(
|
|||
f'{pformat(cids2pps)}'
|
||||
)
|
||||
|
||||
# log.info(f'Loaded {len(trades)} from this session')
|
||||
# TODO: write trades to local ``trades.toml``
|
||||
# - use above per-session trades data and write to local file
|
||||
# - get the "flex reports" working and pull historical data and
|
||||
# also save locally.
|
||||
|
||||
await ctx.started((
|
||||
all_positions,
|
||||
tuple(name for name in accounts_def if name in accounts),
|
||||
))
|
||||
|
||||
# TODO: maybe just write on teardown?
|
||||
# we might also want to delegate a specific actor for
|
||||
# ledger writing / reading for speed?
|
||||
|
||||
# write ledger with all new trades **AFTER** we've updated the
|
||||
# `pps.toml` from the original ledger state!
|
||||
for acctid, trades_by_id in ledger_entries.items():
|
||||
with pp.open_trade_ledger('ib', acctid) as ledger:
|
||||
ledger.update(trades_by_id)
|
||||
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
||||
ledgers[acctid].update(trades_by_id)
|
||||
|
||||
async with (
|
||||
ctx.open_stream() as ems_stream,
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
# start order request handler **before** local trades event loop
|
||||
# start order request handler **before** local trades
|
||||
# event loop
|
||||
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
||||
|
||||
# allocate event relay tasks for each client connection
|
||||
|
@ -613,6 +593,9 @@ async def trades_dialogue(
|
|||
accounts_def,
|
||||
cids2pps,
|
||||
proxies,
|
||||
|
||||
ledgers,
|
||||
tables,
|
||||
)
|
||||
|
||||
# block until cancelled
|
||||
|
@ -626,44 +609,44 @@ async def emit_pp_update(
|
|||
proxies: dict,
|
||||
cids2pps: dict,
|
||||
|
||||
ledgers,
|
||||
tables,
|
||||
|
||||
) -> None:
|
||||
|
||||
# compute and relay incrementally updated piker pp
|
||||
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
||||
proxy = proxies[acctid]
|
||||
|
||||
acctname = acctid.strip('ib.')
|
||||
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
|
||||
acctid = acctid.strip('ib.')
|
||||
(
|
||||
records_by_acct,
|
||||
api_ready_for_ledger_entries,
|
||||
) = await update_ledger_from_api_trades(
|
||||
[trade_entry],
|
||||
proxy,
|
||||
)
|
||||
records = records_by_acct[acctname]
|
||||
r = records[0]
|
||||
trans = records_by_acct[acctid]
|
||||
r = list(trans.values())[0]
|
||||
|
||||
# update and load all positions from `pps.toml`, cross check with
|
||||
# ib's positions data, and relay re-formatted pps as msgs to the
|
||||
# ems. we report both the open and closed updates in one map since
|
||||
# for incremental update we may have just fully closed a pp and need
|
||||
# to relay that msg as well!
|
||||
active, closed = pp.update_pps_conf(
|
||||
'ib',
|
||||
acctname,
|
||||
trade_records=records,
|
||||
ledger_reload={r.bsuid: r.fqsn},
|
||||
)
|
||||
table = tables[acctid]
|
||||
table.update_from_trans(trans)
|
||||
_, closed = table.dump_active('ib')
|
||||
active = table.pps
|
||||
|
||||
# NOTE: write ledger with all new trades **AFTER** we've updated the
|
||||
# `pps.toml` from the original ledger state!
|
||||
for acctid, trades_by_id in ledger_entries.items():
|
||||
with pp.open_trade_ledger('ib', acctid) as ledger:
|
||||
# NOTE: update ledger with all new trades
|
||||
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
||||
ledger = ledgers[acctid]
|
||||
ledger.update(trades_by_id)
|
||||
|
||||
# generate pp msgs and cross check with ib's positions data, relay
|
||||
# re-formatted pps as msgs to the ems.
|
||||
for pos in filter(
|
||||
bool,
|
||||
[active.get(r.bsuid), closed.get(r.bsuid)]
|
||||
):
|
||||
msgs = await update_and_audit_msgs(
|
||||
acctname,
|
||||
acctid,
|
||||
[pos],
|
||||
cids2pps,
|
||||
|
||||
|
@ -685,6 +668,9 @@ async def deliver_trade_events(
|
|||
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
||||
proxies: dict[str, MethodProxy],
|
||||
|
||||
ledgers,
|
||||
tables,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Format and relay all trade events for a given client to emsd.
|
||||
|
@ -834,6 +820,8 @@ async def deliver_trade_events(
|
|||
accounts_def,
|
||||
proxies,
|
||||
cids2pps,
|
||||
ledgers,
|
||||
tables,
|
||||
)
|
||||
|
||||
case 'cost':
|
||||
|
@ -866,6 +854,8 @@ async def deliver_trade_events(
|
|||
accounts_def,
|
||||
proxies,
|
||||
cids2pps,
|
||||
ledgers,
|
||||
tables,
|
||||
)
|
||||
|
||||
case 'error':
|
||||
|
@ -916,14 +906,13 @@ async def deliver_trade_events(
|
|||
def norm_trade_records(
|
||||
ledger: dict[str, Any],
|
||||
|
||||
) -> list[pp.Transaction]:
|
||||
) -> list[Transaction]:
|
||||
'''
|
||||
Normalize a flex report or API retrieved executions
|
||||
ledger into our standard record format.
|
||||
|
||||
'''
|
||||
records: list[pp.Transaction] = []
|
||||
|
||||
records: dict[str, Transaction] = {}
|
||||
for tid, record in ledger.items():
|
||||
|
||||
conid = record.get('conId') or record['conid']
|
||||
|
@ -1001,7 +990,7 @@ def norm_trade_records(
|
|||
# which case, we can pull the fqsn from that table (see
|
||||
# `trades_dialogue()` above).
|
||||
|
||||
records.append(pp.Transaction(
|
||||
records[tid] = Transaction(
|
||||
fqsn=fqsn,
|
||||
tid=tid,
|
||||
size=size,
|
||||
|
@ -1010,7 +999,7 @@ def norm_trade_records(
|
|||
dt=dt,
|
||||
expiry=expiry,
|
||||
bsuid=conid,
|
||||
))
|
||||
)
|
||||
|
||||
return records
|
||||
|
||||
|
@ -1139,8 +1128,7 @@ def load_flex_trades(
|
|||
|
||||
trade_entries = report.extract('Trade')
|
||||
ln = len(trade_entries)
|
||||
# log.info(f'Loaded {ln} trades from flex query')
|
||||
print(f'Loaded {ln} trades from flex query')
|
||||
log.info(f'Loaded {ln} trades from flex query')
|
||||
|
||||
trades_by_account = trades_to_ledger_entries(
|
||||
# get reverse map to user account names
|
||||
|
@ -1149,14 +1137,20 @@ def load_flex_trades(
|
|||
source_type='flex',
|
||||
)
|
||||
|
||||
ledgers = {}
|
||||
for acctid, trades_by_id in trades_by_account.items():
|
||||
with pp.open_trade_ledger('ib', acctid) as ledger:
|
||||
ledger.update(trades_by_id)
|
||||
for acctid in trades_by_account:
|
||||
trades_by_id = trades_by_account[acctid]
|
||||
with open_trade_ledger('ib', acctid) as ledger_dict:
|
||||
tid_delta = set(trades_by_id) - set(ledger_dict)
|
||||
log.info(
|
||||
'New trades detected\n'
|
||||
f'{pformat(tid_delta)}'
|
||||
)
|
||||
if tid_delta:
|
||||
ledger_dict.update(
|
||||
{tid: trades_by_id[tid] for tid in tid_delta}
|
||||
)
|
||||
|
||||
ledgers[acctid] = ledger
|
||||
|
||||
return ledgers
|
||||
return ledger_dict
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -301,7 +301,13 @@ async def get_bars(
|
|||
else:
|
||||
|
||||
log.warning('Sending CONNECTION RESET')
|
||||
await data_reset_hack(reset_type='connection')
|
||||
res = await data_reset_hack(reset_type='connection')
|
||||
if not res:
|
||||
log.warning(
|
||||
'NO VNC DETECTED!\n'
|
||||
'Manually press ctrl-alt-f on your IB java app'
|
||||
)
|
||||
# break
|
||||
|
||||
with trio.move_on_after(timeout) as cs:
|
||||
for name, ev in [
|
||||
|
@ -570,7 +576,6 @@ def normalize(
|
|||
|
||||
# check for special contract types
|
||||
con = ticker.contract
|
||||
|
||||
fqsn, calc_price = con2fqsn(con)
|
||||
|
||||
# convert named tuples to dicts so we send usable keys
|
||||
|
@ -842,7 +847,10 @@ async def data_reset_hack(
|
|||
client.mouse.click()
|
||||
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
||||
|
||||
try:
|
||||
await tractor.to_asyncio.run_task(vnc_click_hack)
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
# we don't really need the ``xdotool`` approach any more B)
|
||||
return True
|
||||
|
@ -857,15 +865,24 @@ async def open_symbol_search(
|
|||
# TODO: load user defined symbol set locally for fast search?
|
||||
await ctx.started({})
|
||||
|
||||
async with open_data_client() as proxy:
|
||||
# async with open_data_client() as proxy:
|
||||
async with (
|
||||
open_client_proxies() as (proxies, clients),
|
||||
):
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
# await tractor.breakpoint()
|
||||
proxy = proxies['ib.algopaper']
|
||||
|
||||
last = time.time()
|
||||
|
||||
async for pattern in stream:
|
||||
log.debug(f'received {pattern}')
|
||||
log.info(f'received {pattern}')
|
||||
now = time.time()
|
||||
|
||||
# this causes tractor hang...
|
||||
# assert 0
|
||||
|
||||
assert pattern, 'IB can not accept blank search pattern'
|
||||
|
||||
# throttle search requests to no faster then 1Hz
|
||||
|
@ -893,7 +910,7 @@ async def open_symbol_search(
|
|||
|
||||
continue
|
||||
|
||||
log.debug(f'searching for {pattern}')
|
||||
log.info(f'searching for {pattern}')
|
||||
|
||||
last = time.time()
|
||||
|
||||
|
@ -904,6 +921,8 @@ async def open_symbol_search(
|
|||
async def stash_results(target: Awaitable[list]):
|
||||
stock_results.extend(await target)
|
||||
|
||||
for i in range(10):
|
||||
with trio.move_on_after(3) as cs:
|
||||
async with trio.open_nursery() as sn:
|
||||
sn.start_soon(
|
||||
stash_results,
|
||||
|
@ -916,6 +935,12 @@ async def open_symbol_search(
|
|||
# trigger async request
|
||||
await trio.sleep(0)
|
||||
|
||||
if cs.cancelled_caught:
|
||||
log.warning(f'Search timeout? {proxy._aio_ns.ib.client}')
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
# # match against our ad-hoc set immediately
|
||||
# adhoc_matches = fuzzy.extractBests(
|
||||
# pattern,
|
||||
|
|
Loading…
Reference in New Issue