Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet e14008701c Drop `open_pps()` from ems tests 2025-09-29 13:33:03 -04:00
Tyler Goodlet 8bb5c1bf96 `ui._remote_ctl`: shield remote rect removals
Since under `trio`-cancellation the `.remove()` is a checkpoint and will
be masked by a taskc AND we **always want to remove the rect** despite
the surrounding teardown conditions.
2025-09-29 13:26:11 -04:00
Tyler Goodlet 0462415491 `_ems`: tolerate and warn on already popped execs
In the `translate_and_relay_brokerd_events()` loop task that is, such
that we never crash on a `status_msg = book._active.pop(oid)` in the
'closed' status handler whenever a double removal happens.

Turns out there were unforeseen races here when a benign backend error
would cause an order-mode dialog to be cancelled (incorrectly) and then
a UI side `.on_cancel()` would trigger too-early removal from the
`book._active` table despite the backend sending an actual 'closed'
event (much) later, this would crash on the now missing entry..

So instead we now,
- obviously use `book._active.pop(oid, None)`
- emit a `log.warning()` (not info lol) on a null-read and with a less
  "one-line-y" message explaining the double removal and maybe *why*.
2025-09-29 13:21:11 -04:00
Tyler Goodlet 62f27bf509 `polars.cumsum()` is now `.cum_sum()` 2025-09-27 12:24:11 -04:00
4 changed files with 30 additions and 17 deletions

View File

@ -493,7 +493,7 @@ def ledger_to_dfs(
df = dfs[key] = ldf.with_columns([ df = dfs[key] = ldf.with_columns([
pl.cumsum('size').alias('cumsize'), pl.cum_sum('size').alias('cumsize'),
# amount of source asset "sent" (via buy txns in # amount of source asset "sent" (via buy txns in
# the market) to acquire the dst asset, PER txn. # the market) to acquire the dst asset, PER txn.
@ -508,7 +508,7 @@ def ledger_to_dfs(
]).with_columns([ ]).with_columns([
# rolling balance in src asset units # rolling balance in src asset units
(pl.col('dst_bot').cumsum() * -1).alias('src_balance'), (pl.col('dst_bot').cum_sum() * -1).alias('src_balance'),
# "position operation type" in terms of increasing the # "position operation type" in terms of increasing the
# amount in the dst asset (entering) or decreasing the # amount in the dst asset (entering) or decreasing the
@ -650,7 +650,7 @@ def ledger_to_dfs(
# cost that was included in the least-recently # cost that was included in the least-recently
# entered txn that is still part of the current CSi # entered txn that is still part of the current CSi
# set. # set.
# => we look up the cost-per-unit cumsum and apply # => we look up the cost-per-unit cum_sum and apply
# if over the current txn size (by multiplication) # if over the current txn size (by multiplication)
# and then reverse that previusly applied cost on # and then reverse that previusly applied cost on
# the txn_cost for this record. # the txn_cost for this record.

View File

@ -388,6 +388,7 @@ async def open_brokerd_dialog(
for ep_name in [ for ep_name in [
'open_trade_dialog', # probably final name? 'open_trade_dialog', # probably final name?
'trades_dialogue', # legacy 'trades_dialogue', # legacy
# ^!TODO, rm this since all backends ported no ?!?
]: ]:
trades_endpoint = getattr( trades_endpoint = getattr(
brokermod, brokermod,
@ -1019,8 +1020,18 @@ async def translate_and_relay_brokerd_events(
) )
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(
status_msg = book._active.pop(oid) f'Execution is complete!\n'
f'oid: {oid!r}\n'
)
status_msg = book._active.pop(oid, None)
if status_msg is None:
log.warning(
f'Order was already cleared from book ??\n'
f'oid: {oid!r}\n'
f'\n'
f'Maybe the order cancelled before submitted ??\n'
)
elif status == 'canceled': elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!') log.cancel(f'Cancellation for {oid} is complete!')
@ -1544,19 +1555,18 @@ async def maybe_open_trade_relays(
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context, # becomes `ems_ctx` below
fqme: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str|None = None, loglevel: str|None = None,
) -> tuple[ ) -> tuple[ # `ctx.started()` value!
dict[ dict[ # positions
# brokername, acctid tuple[str, str], # brokername, acctid
tuple[str, str],
list[BrokerdPosition], list[BrokerdPosition],
], ],
list[str], list[str], # accounts
dict[str, Status], dict[str, Status], # dialogs
]: ]:
''' '''
EMS (sub)actor entrypoint providing the execution management EMS (sub)actor entrypoint providing the execution management

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Remote control tasks for sending annotations (and maybe more cmds) Remote control tasks for sending annotations (and maybe more cmds) to
to a chart from some other actor. a chart from some other actor.
''' '''
from __future__ import annotations from __future__ import annotations
@ -32,6 +32,7 @@ from typing import (
) )
import tractor import tractor
import trio
from tractor import trionics from tractor import trionics
from tractor import ( from tractor import (
Portal, Portal,
@ -316,7 +317,9 @@ class AnnotCtl(Struct):
) )
yield aid yield aid
finally: finally:
await self.remove(aid) # async ipc send op
with trio.CancelScope(shield=True):
await self.remove(aid)
async def redraw( async def redraw(
self, self,

View File

@ -42,7 +42,7 @@ from piker.accounting import (
unpack_fqme, unpack_fqme,
) )
from piker.accounting import ( from piker.accounting import (
open_pps, open_account,
Position, Position,
) )
@ -136,7 +136,7 @@ def load_and_check_pos(
) -> None: ) -> None:
with open_pps(ppmsg.broker, ppmsg.account) as table: with open_account(ppmsg.broker, ppmsg.account) as table:
if ppmsg.size == 0: if ppmsg.size == 0:
assert ppmsg.symbol not in table.pps assert ppmsg.symbol not in table.pps