Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet 390a57c96d ib: never relay "Warning:" errors to EMS..
You'd think they could be bothered to make either a "log" or "warning"
msg type instead of a `type='error'`.. but alas, this attempts to detect
all such "warning"-errors and never proxy them to the clearing engine
thus avoiding the cancellation of any associated (by `reqid`)
pre-existing orders (control dialogs).

Also update all surrounding log messages to a more multiline style.
2025-09-17 18:54:47 -04:00
Tyler Goodlet 69eac7bb15 Spurious first-draft of EG collapsing
Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
2025-09-15 19:27:56 -04:00
Tyler Goodlet a45de0b710 ib-related: cope with invalid txn timestamps
That is inside embedded `.accounting.calc.dyn_parse_to_dt()` closure add
an optional `_invalid: list` param to where we can report
bad-timestamped records which we instead override and return as
`from_timestamp(0.)` (when the parser loop falls through) and report
later (in summary ) from the `.accounting.calc.iter_by_dt()` caller
. Add some logging and an optional debug block for future tracing.
2025-09-15 18:29:19 -04:00
Tyler Goodlet 9df1988aa6 ib: jig `.data_reset_hack()` with vnc-client failover
Since apparently porting to the new docker container enforces using
a vnc password and `asyncvnc` seems to have a bug/mis-config whenever
i've tried a pw over a wg tunnel..?

Soo, this tries out the old `i3ipc`-win-focus + `xdo` click hack when
the above fails.

Deats,
- add a mod-level `try_xdo_manual()` to wrap calling
  `i3ipc_xdotool_manual_click_hack()` with an oserr handler, ensure we
  don't bother trying if `i3ipc` import fails beforehand tho.
- call ^ from both the orig case block and the failover from the
  vnc-client case.
- factor the `+no_setup_msg: str` out to mod level and expect it to be
  `.format()`-ed.
- refresh todo around `asyncvnc` pw ish..
- add a new `i3ipc_fin_wins_titled()` window-title scanner which
  predicates input `titles` and delivers any matches alongside the orig
  focused win at call time.
- tweak `i3ipc_xdotool_manual_click_hack()` to call ^ and remove prior
  unfactored window scanning logic.
2025-09-15 16:53:25 -04:00
12 changed files with 302 additions and 129 deletions

View File

@ -121,6 +121,7 @@ async def bot_main():
# tick_throttle=10, # tick_throttle=10,
) as feed, ) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
assert accounts assert accounts

View File

@ -362,7 +362,11 @@ class Position(Struct):
# added: bool = False # added: bool = False
tid: str = t.tid tid: str = t.tid
if tid in self._events: if tid in self._events:
log.warning(f'{t} is already added?!') log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
# return added # return added
# TODO: apparently this IS possible with a dict but not # TODO: apparently this IS possible with a dict but not
@ -696,7 +700,7 @@ class Account(Struct):
else: else:
# TODO: we reallly need a diff set of # TODO: we reallly need a diff set of
# loglevels/colors per subsys. # loglevels/colors per subsys.
log.warning( log.debug(
f'Recent position for {fqme} was closed!' f'Recent position for {fqme} was closed!'
) )

View File

@ -22,7 +22,9 @@ you know when you're losing money (if possible) XD
from __future__ import annotations from __future__ import annotations
from collections.abc import ValuesView from collections.abc import ValuesView
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
from functools import partial
from math import copysign from math import copysign
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -37,12 +39,16 @@ from pendulum import (
parse, parse,
) )
from ..log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ._ledger import ( from ._ledger import (
Transaction, Transaction,
TransactionLedger, TransactionLedger,
) )
log = get_logger(__name__)
def ppu( def ppu(
clears: Iterator[Transaction], clears: Iterator[Transaction],
@ -238,6 +244,9 @@ def iter_by_dt(
def dyn_parse_to_dt( def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction, tx: tuple[str, dict[str, Any]] | Transaction,
debug: bool = False,
_invalid: list|None = None,
) -> DateTime: ) -> DateTime:
# handle `.items()` inputs # handle `.items()` inputs
@ -250,11 +259,16 @@ def iter_by_dt(
# get best parser for this record.. # get best parser for this record..
for k in parsers: for k in parsers:
if ( if (
isdict and k in tx (v := getattr(tx, k, None))
or getattr(tx, k, None) or
(
isdict
and
(v := tx.get(k))
)
): ):
v = tx[k] if isdict else tx.dt # TODO? remove yah?
assert v is not None, f'No valid value for `{k}`!?' # v = tx[k] if isdict else tx.dt
# only call parser on the value if not None from # only call parser on the value if not None from
# the `parsers` table above (when NOT using # the `parsers` table above (when NOT using
@ -262,21 +276,54 @@ def iter_by_dt(
# sort on it directly # sort on it directly
if ( if (
not isinstance(v, DateTime) not isinstance(v, DateTime)
and (parser := parsers.get(k)) and
(parser := parsers.get(k))
): ):
return parser(v) ret = parser(v)
else: else:
return v ret = v
return ret
else:
continue
# XXX: should never get here..
else: else:
# XXX: should never get here.. if debug:
breakpoint() import tractor
with tractor.devx.maybe_open_crash_handler():
raise ValueError(
f'Invalid txn time ??\n'
f'txn-id: {k!r}\n'
f'{k!r}: {v!r}\n'
)
# assert v is not None, f'No valid value for `{k}`!?'
entry: tuple[str, dict] | Transaction if _invalid is not None:
_invalid.append(tx)
return from_timestamp(0.)
# breakpoint()
entry: tuple[str, dict]|Transaction
invalid: list = []
for entry in sorted( for entry in sorted(
records, records,
key=key or dyn_parse_to_dt, key=key or partial(
dyn_parse_to_dt,
_invalid=invalid,
),
): ):
if entry in invalid:
log.warning(
f'Ignoring txn w invalid timestamp ??\n'
f'{pformat(entry)}\n'
# f'txn-id: {k!r}\n'
# f'{k!r}: {v!r}\n'
)
continue
# NOTE the type sig above; either pairs or txns B) # NOTE the type sig above; either pairs or txns B)
yield entry yield entry

View File

@ -440,6 +440,7 @@ async def open_trade_dialog(
# - ledger: TransactionLedger # - ledger: TransactionLedger
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
): ):

View File

@ -38,6 +38,7 @@ from typing import (
) )
from pendulum import now from pendulum import now
import tractor
import trio import trio
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
@ -708,6 +709,7 @@ async def get_client(
) -> Client: ) -> Client:
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_ws_url, response_type=JSONRPCResult _ws_url, response_type=JSONRPCResult

View File

@ -34,6 +34,7 @@ from piker.brokers._util import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from .api import Client from .api import Client
from ib_insync import IB from ib_insync import IB
import i3ipc
log = get_logger('piker.brokers.ib') log = get_logger('piker.brokers.ib')
@ -48,6 +49,37 @@ _reset_tech: Literal[
] = 'vnc' ] = 'vnc'
no_setup_msg:str = (
'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
def try_xdo_manual(
vnc_sockaddr: str,
):
'''
Do the "manual" `xdo`-based screen switch + click
combo since apparently the `asyncvnc` client ain't workin..
Note this is only meant as a backup method for Xorg users,
ideally you can use a real vnc client and the `vnc_click_hack()`
impl!
'''
global _reset_tech
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(
no_setup_msg.format(vnc_sockaddr)
)
return False
async def data_reset_hack( async def data_reset_hack(
# vnc_host: str, # vnc_host: str,
client: Client, client: Client,
@ -90,15 +122,9 @@ async def data_reset_hack(
vnc_port: int vnc_port: int
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs') vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
no_setup_msg:str = (
f'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
if not vnc_sockaddr: if not vnc_sockaddr:
log.warning( log.warning(
no_setup_msg no_setup_msg.format(vnc_sockaddr)
+ +
'REQUIRES A `vnc_addrs: array` ENTRY' 'REQUIRES A `vnc_addrs: array` ENTRY'
) )
@ -119,27 +145,38 @@ async def data_reset_hack(
port=vnc_port, port=vnc_port,
) )
) )
except OSError: except (
if vnc_host != 'localhost': OSError, # no VNC server avail..
log.warning(no_setup_msg) PermissionError, # asyncvnc pw fail..
return False ):
try: try:
import i3ipc # noqa (since a deps dynamic check) import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError: except ModuleNotFoundError:
log.warning(no_setup_msg) log.warning(
no_setup_msg.format(vnc_sockaddr)
)
return False return False
try: if vnc_host not in {
i3ipc_xdotool_manual_click_hack() 'localhost',
_reset_tech = 'i3ipc_xdotool' '127.0.0.1',
return True }:
except OSError: focussed, matches = i3ipc_fin_wins_titled()
log.exception(no_setup_msg) if not matches:
return False log.warning(
no_setup_msg.format(vnc_sockaddr)
)
return False
else:
try_xdo_manual(vnc_sockaddr)
# localhost but no vnc-client or it borked..
else:
try_xdo_manual(vnc_sockaddr)
case 'i3ipc_xdotool': case 'i3ipc_xdotool':
i3ipc_xdotool_manual_click_hack() try_xdo_manual(vnc_sockaddr)
# i3ipc_xdotool_manual_click_hack()
case _ as tech: case _ as tech:
raise RuntimeError(f'{tech} is not supported for reset tech!?') raise RuntimeError(f'{tech} is not supported for reset tech!?')
@ -178,9 +215,9 @@ async def vnc_click_hack(
host, host,
port=port, port=port,
# TODO: doesn't work see: # TODO: doesn't work?
# https://github.com/barneygale/asyncvnc/issues/7 # see, https://github.com/barneygale/asyncvnc/issues/7
# password='ibcansmbz', password='doggy',
) as client: ) as client:
@ -194,70 +231,103 @@ async def vnc_click_hack(
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
def i3ipc_fin_wins_titled(
titles: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
# !TODO, remote vnc instance
# -[ ] something in title (or other Con-props) that indicates
# this is explicitly for ibrk sw?
# |_[ ] !can use modden spawn eventually!
'TigerVNC',
# 'vncviewer', # the terminal..
],
) -> tuple[
i3ipc.Con, # orig focussed win
list[tuple[str, i3ipc.Con]], # matching wins by title
]:
'''
Attempt to find a local-DE window titled with an entry in
`titles`.
If found deliver the current focussed window and all matching
`i3ipc.Con`s in a list.
'''
import i3ipc
ipc = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
tree = ipc.get_tree()
focussed: i3ipc.Con = tree.find_focused()
matches: list[i3ipc.Con] = []
for name in titles:
results = tree.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
matches.append((
name,
con,
))
return (
focussed,
matches,
)
def i3ipc_xdotool_manual_click_hack() -> None: def i3ipc_xdotool_manual_click_hack() -> None:
''' '''
Do the data reset hack but expecting a local X-window using `xdotool`. Do the data reset hack but expecting a local X-window using `xdotool`.
''' '''
import i3ipc focussed, matches = i3ipc_fin_wins_titled()
i3 = i3ipc.Connection() orig_win_id = focussed.window
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
try: try:
for name in win_names: for name, con in matches:
results = t.find_titled(name) print(f'Resetting data feed for {name}')
print(f'results for {name}: {results}') win_id = str(con.window)
if results: w, h = con.rect.width, con.rect.height
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: seems to be a few libs for python but not sure # TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of # if they support all the sub commands we need, order of
# most recent commit history: # most recent commit history:
# https://github.com/rr-/pyxdotool # https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool # https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool # https://github.com/cphyc/pyxdotool
# TODO: only run the reconnect (2nd) kc on a detected # TODO: only run the reconnect (2nd) kc on a detected
# disconnect? # disconnect?
for key_combo, timeout in [ for key_combo, timeout in [
# only required if we need a connection reset. # only required if we need a connection reset.
# ('ctrl+alt+r', 12), # ('ctrl+alt+r', 12),
# data feed reset. # data feed reset.
('ctrl+alt+f', 6) ('ctrl+alt+f', 6)
]: ]:
subprocess.call([ subprocess.call([
'xdotool', 'xdotool',
'windowactivate', '--sync', win_id, 'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where # move mouse to bottom left of window (where
# there should be nothing to click). # there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4), 'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here.. # NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id, 'click', '--window', win_id,
'--repeat', '3', '1', '--repeat', '3', '1',
# hackzorzes # hackzorzes
'key', key_combo, 'key', key_combo,
], ],
timeout=timeout, timeout=timeout,
) )
# re-activate and focus original window # re-activate and focus original window
subprocess.call([ subprocess.call([

View File

@ -48,6 +48,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
from tractor import trionics
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
DateTime, DateTime,
@ -1373,8 +1374,8 @@ async def load_clients_for_trio(
''' '''
Pure async mngr proxy to ``load_aio_clients()``. Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoing to call from This is a bootstrap entrypoint to call from
a ``tractor.to_asyncio.open_channel_from()``. a `tractor.to_asyncio.open_channel_from()`.
''' '''
async with load_aio_clients( async with load_aio_clients(
@ -1395,7 +1396,10 @@ async def open_client_proxies() -> tuple[
async with ( async with (
tractor.trionics.maybe_open_context( tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from, acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio}, kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access # lock around current actor task access
# TODO: maybe this should be the default in tractor? # TODO: maybe this should be the default in tractor?
@ -1588,7 +1592,8 @@ async def open_client_proxy(
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as relay_n, trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
): ):
assert isinstance(first, Client) assert isinstance(first, Client)
@ -1628,7 +1633,7 @@ async def open_client_proxy(
continue continue
relay_n.start_soon(relay_events) relay_tn.start_soon(relay_events)
yield proxy yield proxy

View File

@ -34,6 +34,7 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor.to_asyncio import LinkedTaskChannel from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
) )
@ -407,7 +408,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level? # TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price: if ibpos.avgCost != msg.avg_price:
log.warning( log.debug(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n' f'ib: {ibfmtmsg}\n'
'---------------------------\n' '---------------------------\n'
@ -738,7 +739,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n' f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
) )
log.error(logmsg) log.debug(logmsg)
await ctx.started(( await ctx.started((
all_positions, all_positions,
@ -747,21 +748,22 @@ async def open_trade_dialog(
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trionics.collapse_eg(),
trio.open_nursery() as tn,
): ):
# relay existing open orders to ems # relay existing open orders to ems
for msg in order_msgs: for msg in order_msgs:
await ems_stream.send(msg) await ems_stream.send(msg)
for client in set(aioclients.values()): for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await n.start( trade_event_stream: LinkedTaskChannel = await tn.start(
open_trade_event_stream, open_trade_event_stream,
client, client,
) )
# start order request handler **before** local trades # start order request handler **before** local trades
# event loop # event loop
n.start_soon( tn.start_soon(
handle_order_requests, handle_order_requests,
ems_stream, ems_stream,
accounts_def, accounts_def,
@ -769,7 +771,7 @@ async def open_trade_dialog(
) )
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
n.start_soon( tn.start_soon(
deliver_trade_events, deliver_trade_events,
trade_event_stream, trade_event_stream,
@ -1241,32 +1243,47 @@ async def deliver_trade_events(
# never relay errors for non-broker related issues # never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html # https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code'] code: int = err['error_code']
if code in { reason: str = err['reason']
200, # uhh reqid: str = str(err['reqid'])
# "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours'
if 'Warning:' in reason:
log.warning(
f'Order-API-warning: {code!r}\n'
f'reqid: {reqid!r}\n'
f'\n'
f'{pformat(err)}\n'
# ^TODO? should we just print the `reason`
# not the full `err`-dict?
)
continue
# XXX known special (ignore) cases
elif code in {
200, # uhh.. ni idea
# hist pacing / connectivity # hist pacing / connectivity
162, 162,
165, 165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented.. # XXX: lol this isn't even documented..
# 'No market data during competing live session' # 'No market data during competing live session'
1669, 1669,
}: }:
log.error(
f'Order-API-error which is non-cancel-causing ?!\n'
f'\n'
f'{pformat(err)}\n'
)
continue continue
reqid: str = str(err['reqid'])
reason: str = err['reason']
if err['reqid'] == -1: if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}') log.error(
f'TWS external order error ??\n'
f'{pformat(err)}\n'
)
flow: dict = dict( flow: dict = dict(
flows.get(reqid) flows.get(reqid)

View File

@ -42,6 +42,7 @@ from bidict import bidict
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import trionics
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -500,7 +501,7 @@ class Router(Struct):
''' '''
# setup at actor spawn time # setup at actor spawn time
nursery: trio.Nursery _tn: trio.Nursery
# broker to book map # broker to book map
books: dict[str, DarkBook] = {} books: dict[str, DarkBook] = {}
@ -666,7 +667,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent # dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no # daemon to allow dark order clearing while no
# client is connected. # client is connected.
self.nursery.start_soon( self._tn.start_soon(
clear_dark_triggers, clear_dark_triggers,
self, self,
relay.brokerd_stream, relay.brokerd_stream,
@ -689,7 +690,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream # spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon. # that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon( self._tn.start_soon(
translate_and_relay_brokerd_events, translate_and_relay_brokerd_events,
broker, broker,
relay.brokerd_stream, relay.brokerd_stream,
@ -763,10 +764,12 @@ async def _setup_persistent_emsd(
global _router global _router
# open a root "service nursery" for the ``emsd`` actor # open a root "service task-nursery" for the `emsd`-actor
async with trio.open_nursery() as service_nursery: async with (
trionics.collapse_eg(),
_router = Router(nursery=service_nursery) trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# TODO: send back the full set of persistent # TODO: send back the full set of persistent
# orders/execs? # orders/execs?
@ -1511,7 +1514,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info', loglevel: str = 'info',
): ):
fqme, relay, feed, client_ready = await _router.nursery.start( fqme, relay, feed, client_ready = await _router._tn.start(
_router.open_trade_relays, _router.open_trade_relays,
fqme, fqme,
exec_mode, exec_mode,

View File

@ -728,7 +728,10 @@ class Feed(Struct):
async for msg in stream: async for msg in stream:
await tx.send(msg) await tx.send(msg)
async with trio.open_nursery() as nurse: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# spawn a relay task for each stream so that they all # spawn a relay task for each stream so that they all
# multiplex to a common channel. # multiplex to a common channel.
for brokername in mods: for brokername in mods:

View File

@ -963,7 +963,10 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any # concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage. # pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon( tn.start_soon(
push_latest_frame, push_latest_frame,
dt_eps, dt_eps,
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
int, int,
Duration, Duration,
]|None = config.get('frame_types', None) ]|None = config.get('frame_types', None)
if def_frame_durs: if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe] def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
else: else:
# use what we calced from first frame above. # use what we calced from first frame above.
def_frame_size = calced_frame_size def_frame_size = calced_frame_size
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first # if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb # history frame until the last datum loaded from the tsdb
# continue that now in the background # continue that now in the background
async with trio.open_nursery() as tn: async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
bf_done = await tn.start( bf_done = await tn.start(
partial( partial(
@ -1308,6 +1320,7 @@ async def manage_history(
# sampling period) data set since normally differently # sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently # sampled timeseries can be loaded / process independently
# ;) # ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info( log.info(

View File

@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n: # async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens) # await n.run_in_actor('other', intermittently_refresh_tokens)
async with trio.open_nursery() as n: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
quoter = await qt.stock_quoter(client, us_symbols) quoter = await qt.stock_quoter(client, us_symbols)
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else: else:
symbols = [tmx_symbols] symbols = [tmx_symbols]
async with trio.open_nursery() as n: async with trio.open_nursery(
strict_exception_groups=False,
) as n:
for syms, func in zip(symbols, stream_what): for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms) n.start_soon(func, feed, syms)