Compare commits

..

No commits in common. "main" and "ib_2025_updates" have entirely different histories.

10 changed files with 129 additions and 343 deletions

View File

@ -33,6 +33,7 @@ from ._pos import (
Account,
load_account,
load_account_from_ledger,
open_pps,
open_account,
Position,
)
@ -67,6 +68,7 @@ __all__ = [
'load_account_from_ledger',
'mk_allocator',
'open_account',
'open_pps',
'open_trade_ledger',
'unpack_fqme',
'DerivTypes',

View File

@ -356,12 +356,13 @@ class Position(Struct):
) -> bool:
'''
Update clearing table by calculating the rolling ppu and
(accumulative) size in both the clears entry and local attrs
state.
(accumulative) size in both the clears entry and local
attrs state.
Inserts are always done in datetime sorted order.
'''
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.debug(
@ -369,7 +370,7 @@ class Position(Struct):
f'\n'
f'{t}\n'
)
return False
# return added
# TODO: apparently this IS possible with a dict but not
# common and probably not that beneficial unless we're also
@ -450,12 +451,6 @@ class Position(Struct):
# def suggest_split(self) -> float:
# ...
# ?TODO, for sending rendered state over the wire?
# def summary(self) -> PositionSummary:
# do minimal conversion to a subset of fields
# currently defined in `.clearing._messages.BrokerdPosition`
class Account(Struct):
'''
@ -499,9 +494,9 @@ class Account(Struct):
def update_from_ledger(
self,
ledger: TransactionLedger|dict[str, Transaction],
ledger: TransactionLedger | dict[str, Transaction],
cost_scalar: float = 2,
symcache: SymbologyCache|None = None,
symcache: SymbologyCache | None = None,
_mktmap_table: dict[str, MktPair] | None = None,
@ -754,7 +749,7 @@ class Account(Struct):
# XXX WTF: if we use a tomlkit.Integer here we get this
# super weird --1 thing going on for cumsize!?1!
# NOTE: the fix was to always float() the size value loaded
# in open_account() below!
# in open_pps() below!
config.write(
config=self.conf,
path=self.conf_path,
@ -938,6 +933,7 @@ def open_account(
clears_table['dt'] = dt
trans.append(Transaction(
fqme=bs_mktid,
# sym=mkt,
bs_mktid=bs_mktid,
tid=tid,
# XXX: not sure why sometimes these are loaded as
@ -960,18 +956,7 @@ def open_account(
):
expiry: pendulum.DateTime = pendulum.parse(expiry)
# !XXX, should never be duplicates over
# a backend-(broker)-system's unique market-IDs!
if pos := pp_objs.get(bs_mktid):
if mkt != pos.mkt:
log.warning(
f'Duplicated position but diff `MktPair.fqme` ??\n'
f'bs_mktid: {bs_mktid!r}\n'
f'pos.mkt: {pos.mkt}\n'
f'mkt: {mkt}\n'
)
else:
pos = pp_objs[bs_mktid] = Position(
pp = pp_objs[bs_mktid] = Position(
mkt,
split_ratio=split_ratio,
bs_mktid=bs_mktid,
@ -983,13 +968,8 @@ def open_account(
# state, since today's records may have already been
# processed!
for t in trans:
added: bool = pos.add_clear(t)
if not added:
log.warning(
f'Txn already recorded in pp ??\n'
f'\n'
f'{t}\n'
)
pp.add_clear(t)
try:
yield acnt
finally:
@ -997,6 +977,20 @@ def open_account(
acnt.write_config()
# TODO: drop the old name and THIS!
@cm
def open_pps(
*args,
**kwargs,
) -> Generator[Account, None, None]:
log.warning(
'`open_pps()` is now deprecated!\n'
'Please use `with open_account() as cnt:`'
)
with open_account(*args, **kwargs) as acnt:
yield acnt
def load_account_from_ledger(
brokername: str,

View File

@ -22,9 +22,7 @@ you know when you're losing money (if possible) XD
from __future__ import annotations
from collections.abc import ValuesView
from contextlib import contextmanager as cm
from functools import partial
from math import copysign
from pprint import pformat
from typing import (
Any,
Callable,
@ -32,7 +30,6 @@ from typing import (
TYPE_CHECKING,
)
from tractor.devx import maybe_open_crash_handler
import polars as pl
from pendulum import (
DateTime,
@ -40,16 +37,12 @@ from pendulum import (
parse,
)
from ..log import get_logger
if TYPE_CHECKING:
from ._ledger import (
Transaction,
TransactionLedger,
)
log = get_logger(__name__)
def ppu(
clears: Iterator[Transaction],
@ -245,9 +238,6 @@ def iter_by_dt(
def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction,
debug: bool = False,
_invalid: list|None = None,
) -> DateTime:
# handle `.items()` inputs
@ -260,90 +250,52 @@ def iter_by_dt(
# get best parser for this record..
for k in parsers:
if (
(v := getattr(tx, k, None))
isdict and k in tx
or
(
isdict
and
(v := tx.get(k))
)
getattr(tx, k, None)
):
v = (
tx[k] if isdict
else tx.dt
)
assert v is not None, (
f'No valid value for `{k}`!?'
)
# only call parser on the value if not None from
# the `parsers` table above (when NOT using
# `.get()`), otherwise pass through the value and
# sort on it directly
if (
not isinstance(v, DateTime)
and
(parser := parsers.get(k))
and (parser := parsers.get(k))
):
ret = parser(v)
return parser(v)
else:
ret = v
return ret
return v
else:
log.debug(
f'Parser-field not found in txn\n'
f'\n'
f'parser-field: {k!r}\n'
f'txn: {tx!r}\n'
f'\n'
f'Trying next..\n'
)
continue
# TODO: move to top?
from piker.log import get_logger
log = get_logger(__name__)
# XXX: we should never really get here bc it means some kinda
# bad txn-record (field) data..
#
# -> set the `debug_mode = True` if you want to trace such
# cases from REPL ;)
else:
# XXX: we should really never get here..
# only if a ledger record has no expected sort(able)
# field will we likely hit this.. like with ze IB.
# if no sortable field just deliver epoch?
log.warning(
'No (time) sortable field for TXN:\n'
f'{tx!r}\n'
f'{tx}\n'
)
report: str = (
f'No supported time-field found in txn !?\n'
f'\n'
f'supported-time-fields: {parsers!r}\n'
f'\n'
f'txn: {tx!r}\n'
)
if debug:
with maybe_open_crash_handler(
pdb=debug,
raise_on_exit=False,
):
raise ValueError(report)
else:
log.error(report)
return from_timestamp(0)
# breakpoint()
if _invalid is not None:
_invalid.append(tx)
return from_timestamp(0.)
entry: tuple[str, dict]|Transaction
invalid: list = []
entry: tuple[str, dict] | Transaction
for entry in sorted(
records,
key=key or partial(
dyn_parse_to_dt,
_invalid=invalid,
),
key=key or dyn_parse_to_dt,
):
if entry in invalid:
log.warning(
f'Ignoring txn w invalid timestamp ??\n'
f'{pformat(entry)}\n'
)
continue
# NOTE the type sig above; either pairs or txns B)
yield entry
@ -406,7 +358,6 @@ def open_ledger_dfs(
acctname: str,
ledger: TransactionLedger | None = None,
debug_mode: bool = False,
**kwargs,
@ -421,10 +372,8 @@ def open_ledger_dfs(
can update the ledger on exit.
'''
with maybe_open_crash_handler(
pdb=debug_mode,
# raise_on_exit=False,
):
from piker.toolz import open_crash_handler
with open_crash_handler():
if not ledger:
import time
from ._ledger import open_trade_ledger

View File

@ -362,10 +362,6 @@ async def update_and_audit_pos_msg(
size=ibpos.position,
avg_price=pikerpos.ppu,
# XXX ensures matching even if multiple venue-names
# in `.bs_fqme`, likely from txn records..
bs_mktid=mkt.bs_mktid,
)
ibfmtmsg: str = pformat(ibpos._asdict())
@ -434,8 +430,7 @@ async def aggr_open_orders(
) -> None:
'''
Collect all open orders from client and fill in `order_msgs:
list`.
Collect all open orders from client and fill in `order_msgs: list`.
'''
trades: list[Trade] = client.ib.openTrades()

View File

@ -388,7 +388,6 @@ async def open_brokerd_dialog(
for ep_name in [
'open_trade_dialog', # probably final name?
'trades_dialogue', # legacy
# ^!TODO, rm this since all backends ported no ?!?
]:
trades_endpoint = getattr(
brokermod,
@ -1028,18 +1027,8 @@ async def translate_and_relay_brokerd_events(
)
if status == 'closed':
log.info(
f'Execution is complete!\n'
f'oid: {oid!r}\n'
)
status_msg = book._active.pop(oid, None)
if status_msg is None:
log.warning(
f'Order was already cleared from book ??\n'
f'oid: {oid!r}\n'
f'\n'
f'Maybe the order cancelled before submitted ??\n'
)
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
@ -1563,18 +1552,19 @@ async def maybe_open_trade_relays(
@tractor.context
async def _emsd_main(
ctx: tractor.Context, # becomes `ems_ctx` below
ctx: tractor.Context,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str|None = None,
) -> tuple[ # `ctx.started()` value!
dict[ # positions
tuple[str, str], # brokername, acctid
) -> tuple[
dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
],
list[str], # accounts
dict[str, Status], # dialogs
list[str],
dict[str, Status],
]:
'''
EMS (sub)actor entrypoint providing the execution management

View File

@ -301,9 +301,6 @@ class BrokerdError(Struct):
# TODO: yeah, so we REALLY need to completely deprecate
# this and use the `.accounting.Position` msg-type instead..
# -[ ] an alternative might be to add a `Position.summary() ->
# `PositionSummary`-msg that we generate since `Position` has a lot
# of fields by default we likely don't want to send over the wire?
class BrokerdPosition(Struct):
'''
Position update event from brokerd.
@ -316,4 +313,3 @@ class BrokerdPosition(Struct):
avg_price: float
currency: str = ''
name: str = 'position'
bs_mktid: str|int|None = None

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote control tasks for sending annotations (and maybe more cmds) to
a chart from some other actor.
Remote control tasks for sending annotations (and maybe more cmds)
to a chart from some other actor.
'''
from __future__ import annotations
@ -32,7 +32,6 @@ from typing import (
)
import tractor
import trio
from tractor import trionics
from tractor import (
Portal,
@ -317,8 +316,6 @@ class AnnotCtl(Struct):
)
yield aid
finally:
# async ipc send op
with trio.CancelScope(shield=True):
await self.remove(aid)
async def redraw(

View File

@ -555,13 +555,14 @@ class OrderMode:
def on_fill(
self,
uuid: str,
price: float,
time_s: float,
pointing: str | None = None,
) -> bool:
) -> None:
'''
Fill msg handler.
@ -574,33 +575,13 @@ class OrderMode:
- update fill bar size
'''
# XXX WARNING XXX
# if a `Status(resp='error')` arrives *before* this
# fill-status, the `.dialogs` entry may have already been
# popped and thus the below will skipped.
#
# NOTE, to avoid this confusing scenario ensure that any
# errors delivered thru from the broker-backend are not just
# "noisy reporting" (like is very common from IB..) and are
# instead ONLY errors-causing-order-dialog-cancellation!
if not (dialog := self.dialogs.get(uuid)):
log.warning(
f'Order was already cleared from `.dialogs` ??\n'
f'uuid: {uuid!r}\n'
)
return False
dialog = self.dialogs[uuid]
lines = dialog.lines
chart = self.chart
if not lines:
log.warn("No line(s) for order {uuid}!?")
return False
# update line state(s)
#
# ?XXX this fails on certain types of races?
# XXX: seems to fail on certain types of races?
# assert len(lines) == 2
if lines:
flume: Flume = self.feed.flumes[chart.linked.mkt.fqme]
_, _, ratio = flume.get_ds_info()
@ -626,31 +607,28 @@ class OrderMode:
pointing=pointing,
color=lines[0].color
)
else:
log.warn("No line(s) for order {uuid}!?")
def on_cancel(
self,
uuid: str,
uuid: str
) -> bool:
) -> None:
msg: Order|None = self.client._sent_orders.pop(uuid, None)
if msg is None:
log.warning(
f'Received cancel for unsubmitted order {pformat(msg)}'
)
return False
msg: Order = self.client._sent_orders.pop(uuid, None)
# remove GUI line, show cursor.
if msg is not None:
self.lines.remove_line(uuid=uuid)
self.chart.linked.cursor.show_xhair()
# remove msg dialog (history)
dialog: Dialog|None = self.dialogs.pop(uuid, None)
dialog = self.dialogs.pop(uuid, None)
if dialog:
dialog.last_status_close()
return True
else:
log.warning(
f'Received cancel for unsubmitted order {pformat(msg)}'
)
def cancel_orders_under_cursor(self) -> list[str]:
return self.cancel_orders(
@ -1079,23 +1057,13 @@ async def process_trade_msg(
if name in (
'position',
):
mkt: MktPair = mode.chart.linked.mkt
sym: MktPair = mode.chart.linked.mkt
pp_msg_symbol = msg['symbol'].lower()
pp_msg_bsmktid = msg['bs_mktid']
fqme = mkt.fqme
broker = mkt.broker
fqme = sym.fqme
broker = sym.broker
if (
# match on any backed-specific(-unique)-ID first!
(
pp_msg_bsmktid
and
mkt.bs_mktid == pp_msg_bsmktid
)
or
# OW try against what's provided as an FQME..
pp_msg_symbol == fqme
or
pp_msg_symbol == fqme.removesuffix(f'.{broker}')
or pp_msg_symbol == fqme.removesuffix(f'.{broker}')
):
log.info(
f'Loading position for `{fqme}`:\n'
@ -1118,7 +1086,7 @@ async def process_trade_msg(
return
msg = Status(**msg)
# resp: str = msg.resp
resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
@ -1182,33 +1150,20 @@ async def process_trade_msg(
mode.on_submit(oid)
case Status(resp='error'):
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
# XXX NOTE, this presumes the rxed "error" is
# order-dialog-cancel-causing, THUS backends much ONLY
# relay errors of this "severity"!!
log.error(
f'Order errored ??\n'
f'oid: {oid!r}\n'
f'\n'
f'{pformat(broker_msg)}\n'
f'\n'
f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing
# a recursion bug!
# -[ ] get this shit on msgspec stat!
# f'{ppfmt(broker_msg)}'
)
# do all the things for a cancel:
# - drop order-msg dialog from client table
# - delete level line from view
mode.on_cancel(oid)
# TODO: parse into broker-side msg, or should we
# expect it to just be **that** msg verbatim (since
# we'd presumably have only 1 `Error` msg-struct)
broker_msg: dict = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
@ -1223,10 +1178,10 @@ async def process_trade_msg(
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmtmsg}')
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
case Status(
resp='triggered',
# TODO: do the struct-msg version, blah blah..
# req=Order(exec_mode='live', action='alert') as req,
req={
'exec_mode': 'live',
'action': 'alert',

View File

@ -12,14 +12,12 @@ from piker import config
from piker.accounting import (
Account,
calc,
open_account,
load_account,
load_account_from_ledger,
open_trade_ledger,
Position,
TransactionLedger,
open_trade_ledger,
load_account,
load_account_from_ledger,
)
import tractor
def test_root_conf_networking_section(
@ -55,17 +53,12 @@ def test_account_file_default_empty(
)
def test_paper_ledger_position_calcs(
fq_acnt: tuple[str, str],
debug_mode: bool,
):
broker: str
acnt_name: str
broker, acnt_name = fq_acnt
accounts_path: Path = (
config.repodir()
/ 'tests'
/ '_inputs' # tests-local-subdir
)
accounts_path: Path = config.repodir() / 'tests' / '_inputs'
ldr: TransactionLedger
with (
@ -84,7 +77,6 @@ def test_paper_ledger_position_calcs(
ledger=ldr,
_fp=accounts_path,
debug_mode=debug_mode,
) as (dfs, ledger),
@ -110,87 +102,3 @@ def test_paper_ledger_position_calcs(
df = dfs[xrp]
assert df['cumsize'][-1] == 0
assert pos.cumsize == 0
@pytest.mark.parametrize(
'fq_acnt',
[
('ib', 'algopaper'),
],
)
def test_ib_account_with_duplicated_mktids(
fq_acnt: tuple[str, str],
debug_mode: bool,
):
# ?TODO, once we start symcache-incremental-update-support?
# from piker.data import (
# open_symcache,
# )
#
# async def main():
# async with (
# # TODO: do this as part of `open_account()`!?
# open_symcache(
# 'ib',
# only_from_memcache=True,
# ) as symcache,
# ):
from piker.brokers.ib.ledger import (
tx_sort,
# ?TODO, once we want to pull lowlevel txns and process them?
# norm_trade_records,
# update_ledger_from_api_trades,
)
broker: str
acnt_id: str = 'algopaper'
broker, acnt_id = fq_acnt
accounts_def = config.load_accounts([broker])
assert accounts_def[f'{broker}.{acnt_id}']
ledger: TransactionLedger
acnt: Account
with (
tractor.devx.maybe_open_crash_handler(pdb=debug_mode),
open_trade_ledger(
'ib',
acnt_id,
tx_sort=tx_sort,
# TODO, eventually incrementally updated for IB..
# symcache=symcache,
symcache=None,
allow_from_sync_code=True,
) as ledger,
open_account(
'ib',
acnt_id,
write_on_exit=True,
) as acnt,
):
# per input params
symcache = ledger.symcache
assert not (
symcache.pairs
or
symcache.pairs
or
symcache.mktmaps
)
# re-compute all positions that have changed state.
# TODO: likely we should change the API to return the
# position updates from `.update_from_ledger()`?
active, closed = acnt.dump_active()
# breakpoint()
# TODO, (see above imports as well) incremental update from
# (updated) ledger?
# -[ ] pull some code from `.ib.broker` content.

View File

@ -42,7 +42,7 @@ from piker.accounting import (
unpack_fqme,
)
from piker.accounting import (
open_account,
open_pps,
Position,
)
@ -136,7 +136,7 @@ def load_and_check_pos(
) -> None:
with open_account(ppmsg.broker, ppmsg.account) as table:
with open_pps(ppmsg.broker, ppmsg.account) as table:
if ppmsg.size == 0:
assert ppmsg.symbol not in table.pps