Compare commits

..

No commits in common. "3f48098c551d0364d8a29be4ce50dd25d63dbcae" and "6cc351814303406d5bc47d4ef34a4b9dea39b4db" have entirely different histories.

5 changed files with 87 additions and 145 deletions

View File

@ -33,6 +33,7 @@ from ._pos import (
Account,
load_account,
load_account_from_ledger,
open_pps,
open_account,
Position,
)
@ -67,6 +68,7 @@ __all__ = [
'load_account_from_ledger',
'mk_allocator',
'open_account',
'open_pps',
'open_trade_ledger',
'unpack_fqme',
'DerivTypes',

View File

@ -353,12 +353,13 @@ class Position(Struct):
) -> bool:
'''
Update clearing table by calculating the rolling ppu and
(accumulative) size in both the clears entry and local attrs
state.
(accumulative) size in both the clears entry and local
attrs state.
Inserts are always done in datetime sorted order.
'''
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.debug(
@ -366,7 +367,7 @@ class Position(Struct):
f'\n'
f'{t}\n'
)
return False
# return added
# TODO: apparently this IS possible with a dict but not
# common and probably not that beneficial unless we're also
@ -447,12 +448,6 @@ class Position(Struct):
# def suggest_split(self) -> float:
# ...
# ?TODO, for sending rendered state over the wire?
# def summary(self) -> PositionSummary:
# do minimal conversion to a subset of fields
# currently defined in `.clearing._messages.BrokerdPosition`
class Account(Struct):
'''
@ -496,9 +491,9 @@ class Account(Struct):
def update_from_ledger(
self,
ledger: TransactionLedger|dict[str, Transaction],
ledger: TransactionLedger | dict[str, Transaction],
cost_scalar: float = 2,
symcache: SymbologyCache|None = None,
symcache: SymbologyCache | None = None,
_mktmap_table: dict[str, MktPair] | None = None,
@ -719,7 +714,7 @@ class Account(Struct):
# XXX WTF: if we use a tomlkit.Integer here we get this
# super weird --1 thing going on for cumsize!?1!
# NOTE: the fix was to always float() the size value loaded
# in open_account() below!
# in open_pps() below!
config.write(
config=self.conf,
path=self.conf_path,
@ -903,6 +898,7 @@ def open_account(
clears_table['dt'] = dt
trans.append(Transaction(
fqme=bs_mktid,
# sym=mkt,
bs_mktid=bs_mktid,
tid=tid,
# XXX: not sure why sometimes these are loaded as
@ -925,22 +921,11 @@ def open_account(
):
expiry: pendulum.DateTime = pendulum.parse(expiry)
# !XXX, should never be duplicates over
# a backend-(broker)-system's unique market-IDs!
if pos := pp_objs.get(bs_mktid):
if mkt != pos.mkt:
log.warning(
f'Duplicated position but diff `MktPair.fqme` ??\n'
f'bs_mktid: {bs_mktid!r}\n'
f'pos.mkt: {pos.mkt}\n'
f'mkt: {mkt}\n'
)
else:
pos = pp_objs[bs_mktid] = Position(
mkt,
split_ratio=split_ratio,
bs_mktid=bs_mktid,
)
pp = pp_objs[bs_mktid] = Position(
mkt,
split_ratio=split_ratio,
bs_mktid=bs_mktid,
)
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
@ -948,13 +933,8 @@ def open_account(
# state, since today's records may have already been
# processed!
for t in trans:
added: bool = pos.add_clear(t)
if not added:
log.warning(
f'Txn already recorded in pp ??\n'
f'\n'
f'{t}\n'
)
pp.add_clear(t)
try:
yield acnt
finally:
@ -962,6 +942,20 @@ def open_account(
acnt.write_config()
# TODO: drop the old name and THIS!
@cm
def open_pps(
*args,
**kwargs,
) -> Generator[Account, None, None]:
log.warning(
'`open_pps()` is now deprecated!\n'
'Please use `with open_account() as cnt:`'
)
with open_account(*args, **kwargs) as acnt:
yield acnt
def load_account_from_ledger(
brokername: str,

View File

@ -358,10 +358,6 @@ async def update_and_audit_pos_msg(
size=ibpos.position,
avg_price=pikerpos.ppu,
# XXX ensures matching even if multiple venue-names
# in `.bs_fqme`, likely from txn records..
bs_mktid=mkt.bs_mktid,
)
ibfmtmsg: str = pformat(ibpos._asdict())
@ -430,8 +426,7 @@ async def aggr_open_orders(
) -> None:
'''
Collect all open orders from client and fill in `order_msgs:
list`.
Collect all open orders from client and fill in `order_msgs: list`.
'''
trades: list[Trade] = client.ib.openTrades()

View File

@ -301,9 +301,6 @@ class BrokerdError(Struct):
# TODO: yeah, so we REALLY need to completely deprecate
# this and use the `.accounting.Position` msg-type instead..
# -[ ] an alternative might be to add a `Position.summary() ->
# `PositionSummary`-msg that we generate since `Position` has a lot
# of fields by default we likely don't want to send over the wire?
class BrokerdPosition(Struct):
'''
Position update event from brokerd.
@ -316,4 +313,3 @@ class BrokerdPosition(Struct):
avg_price: float
currency: str = ''
name: str = 'position'
bs_mktid: str|int|None = None

View File

@ -555,13 +555,14 @@ class OrderMode:
def on_fill(
self,
uuid: str,
price: float,
time_s: float,
pointing: str | None = None,
) -> bool:
) -> None:
'''
Fill msg handler.
@ -574,83 +575,60 @@ class OrderMode:
- update fill bar size
'''
# XXX WARNING XXX
# if a `Status(resp='error')` arrives *before* this
# fill-status, the `.dialogs` entry may have already been
# popped and thus the below will skipped.
#
# NOTE, to avoid this confusing scenario ensure that any
# errors delivered thru from the broker-backend are not just
# "noisy reporting" (like is very common from IB..) and are
# instead ONLY errors-causing-order-dialog-cancellation!
if not (dialog := self.dialogs.get(uuid)):
log.warning(
f'Order was already cleared from `.dialogs` ??\n'
f'uuid: {uuid!r}\n'
)
return False
dialog = self.dialogs[uuid]
lines = dialog.lines
chart = self.chart
if not lines:
log.warn("No line(s) for order {uuid}!?")
return False
# update line state(s)
#
# ?XXX this fails on certain types of races?
# XXX: seems to fail on certain types of races?
# assert len(lines) == 2
flume: Flume = self.feed.flumes[chart.linked.mkt.fqme]
_, _, ratio = flume.get_ds_info()
if lines:
flume: Flume = self.feed.flumes[chart.linked.mkt.fqme]
_, _, ratio = flume.get_ds_info()
for chart, shm in [
(self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm),
]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
for chart, shm in [
(self.chart, flume.rt_shm),
(self.hist_chart, flume.hist_shm),
]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
# TODO: borked for int index based..
index = flume.get_index(time_s, arr)
# TODO: borked for int index based..
index = flume.get_index(time_s, arr)
# get absolute index for arrow placement
arrow_index = arr[index_field][index]
# get absolute index for arrow placement
arrow_index = arr[index_field][index]
self.arrows.add(
chart.plotItem,
uuid,
arrow_index,
price,
pointing=pointing,
color=lines[0].color
)
self.arrows.add(
chart.plotItem,
uuid,
arrow_index,
price,
pointing=pointing,
color=lines[0].color
)
else:
log.warn("No line(s) for order {uuid}!?")
def on_cancel(
self,
uuid: str,
uuid: str
) -> bool:
) -> None:
msg: Order|None = self.client._sent_orders.pop(uuid, None)
if msg is None:
msg: Order = self.client._sent_orders.pop(uuid, None)
if msg is not None:
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
dialog = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
else:
log.warning(
f'Received cancel for unsubmitted order {pformat(msg)}'
)
return False
# remove GUI line, show cursor.
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
# remove msg dialog (history)
dialog: Dialog|None = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
return True
def cancel_orders_under_cursor(self) -> list[str]:
return self.cancel_orders(
@ -1079,23 +1057,13 @@ async def process_trade_msg(
if name in (
'position',
):
mkt: MktPair = mode.chart.linked.mkt
sym: MktPair = mode.chart.linked.mkt
pp_msg_symbol = msg['symbol'].lower()
pp_msg_bsmktid = msg['bs_mktid']
fqme = mkt.fqme
broker = mkt.broker
fqme = sym.fqme
broker = sym.broker
if (
# match on any backed-specific(-unique)-ID first!
(
pp_msg_bsmktid
and
mkt.bs_mktid == pp_msg_bsmktid
)
or
# OW try against what's provided as an FQME..
pp_msg_symbol == fqme
or
pp_msg_symbol == fqme.removesuffix(f'.{broker}')
or pp_msg_symbol == fqme.removesuffix(f'.{broker}')
):
log.info(
f'Loading position for `{fqme}`:\n'
@ -1118,7 +1086,7 @@ async def process_trade_msg(
return
msg = Status(**msg)
# resp: str = msg.resp
resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
@ -1182,33 +1150,20 @@ async def process_trade_msg(
mode.on_submit(oid)
case Status(resp='error'):
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
# XXX NOTE, this presumes the rxed "error" is
# order-dialog-cancel-causing, THUS backends much ONLY
# relay errors of this "severity"!!
log.error(
f'Order errored ??\n'
f'oid: {oid!r}\n'
f'\n'
f'{pformat(broker_msg)}\n'
f'\n'
f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing
# a recursion bug!
# -[ ] get this shit on msgspec stat!
# f'{ppfmt(broker_msg)}'
)
# do all the things for a cancel:
# - drop order-msg dialog from client table
# - delete level line from view
mode.on_cancel(oid)
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
@ -1223,10 +1178,10 @@ async def process_trade_msg(
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmtmsg}')
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
case Status(
resp='triggered',
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
req={
'exec_mode': 'live',
'action': 'alert',