Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet e14008701c Drop `open_pps()` from ems tests 2025-09-29 13:33:03 -04:00
Tyler Goodlet 8bb5c1bf96 `ui._remote_ctl`: shield remote rect removals
Since under `trio`-cancellation the `.remove()` is a checkpoint and will
be masked by a taskc AND we **always want to remove the rect** despite
the surrounding teardown conditions.
2025-09-29 13:26:11 -04:00
Tyler Goodlet 0462415491 `_ems`: tolerate and warn on already popped execs
In the `translate_and_relay_brokerd_events()` loop task that is, such
that we never crash on a `status_msg = book._active.pop(oid)` in the
'closed' status handler whenever a double removal happens.

Turns out there were unforeseen races here when a benign backend error
would cause an order-mode dialog to be cancelled (incorrectly) and then
a UI side `.on_cancel()` would trigger too-early removal from the
`book._active` table despite the backend sending an actual 'closed'
event (much) later, this would crash on the now missing entry..

So instead we now,
- obviously use `book._active.pop(oid, None)`
- emit a `log.warning()` (not info lol) on a null-read and with a less
  "one-line-y" message explaining the double removal and maybe *why*.
2025-09-29 13:21:11 -04:00
Tyler Goodlet 62f27bf509 `polars.cumsum()` is now `.cum_sum()` 2025-09-27 12:24:11 -04:00
4 changed files with 30 additions and 17 deletions

View File

@ -493,7 +493,7 @@ def ledger_to_dfs(
df = dfs[key] = ldf.with_columns([
pl.cumsum('size').alias('cumsize'),
pl.cum_sum('size').alias('cumsize'),
# amount of source asset "sent" (via buy txns in
# the market) to acquire the dst asset, PER txn.
@ -508,7 +508,7 @@ def ledger_to_dfs(
]).with_columns([
# rolling balance in src asset units
(pl.col('dst_bot').cumsum() * -1).alias('src_balance'),
(pl.col('dst_bot').cum_sum() * -1).alias('src_balance'),
# "position operation type" in terms of increasing the
# amount in the dst asset (entering) or decreasing the
@ -650,7 +650,7 @@ def ledger_to_dfs(
# cost that was included in the least-recently
# entered txn that is still part of the current CSi
# set.
# => we look up the cost-per-unit cumsum and apply
# => we look up the cost-per-unit cum_sum and apply
# if over the current txn size (by multiplication)
# and then reverse that previusly applied cost on
# the txn_cost for this record.

View File

@ -388,6 +388,7 @@ async def open_brokerd_dialog(
for ep_name in [
'open_trade_dialog', # probably final name?
'trades_dialogue', # legacy
# ^!TODO, rm this since all backends ported no ?!?
]:
trades_endpoint = getattr(
brokermod,
@ -1019,8 +1020,18 @@ async def translate_and_relay_brokerd_events(
)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
log.info(
f'Execution is complete!\n'
f'oid: {oid!r}\n'
)
status_msg = book._active.pop(oid, None)
if status_msg is None:
log.warning(
f'Order was already cleared from book ??\n'
f'oid: {oid!r}\n'
f'\n'
f'Maybe the order cancelled before submitted ??\n'
)
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
@ -1544,19 +1555,18 @@ async def maybe_open_trade_relays(
@tractor.context
async def _emsd_main(
ctx: tractor.Context,
ctx: tractor.Context, # becomes `ems_ctx` below
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str|None = None,
) -> tuple[
dict[
# brokername, acctid
tuple[str, str],
) -> tuple[ # `ctx.started()` value!
dict[ # positions
tuple[str, str], # brokername, acctid
list[BrokerdPosition],
],
list[str],
dict[str, Status],
list[str], # accounts
dict[str, Status], # dialogs
]:
'''
EMS (sub)actor entrypoint providing the execution management

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote control tasks for sending annotations (and maybe more cmds)
to a chart from some other actor.
Remote control tasks for sending annotations (and maybe more cmds) to
a chart from some other actor.
'''
from __future__ import annotations
@ -32,6 +32,7 @@ from typing import (
)
import tractor
import trio
from tractor import trionics
from tractor import (
Portal,
@ -316,7 +317,9 @@ class AnnotCtl(Struct):
)
yield aid
finally:
await self.remove(aid)
# async ipc send op
with trio.CancelScope(shield=True):
await self.remove(aid)
async def redraw(
self,

View File

@ -42,7 +42,7 @@ from piker.accounting import (
unpack_fqme,
)
from piker.accounting import (
open_pps,
open_account,
Position,
)
@ -136,7 +136,7 @@ def load_and_check_pos(
) -> None:
with open_pps(ppmsg.broker, ppmsg.account) as table:
with open_account(ppmsg.broker, ppmsg.account) as table:
if ppmsg.size == 0:
assert ppmsg.symbol not in table.pps