Compare commits

..

No commits in common. "2f6e3ad03f64f5f62e0096eb7421ef3d84dd2fcf" and "b52c4092f30bd19d255c2ee153cf3393ba601268" have entirely different histories.

9 changed files with 54 additions and 185 deletions

View File

@ -36,6 +36,8 @@ from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
# Option,
# Forex,
)
from ib_insync.order import (
Trade,
@ -362,24 +364,11 @@ async def update_and_audit_msgs(
# presume we're at least not more in the shit then we
# thought.
if diff:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = {int(split_ratio)}'
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following '
f'in the `pps.toml` section:\n{entry}'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
)
msg.size = ibsize
@ -543,7 +532,6 @@ async def trades_dialogue(
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg
@ -583,22 +571,14 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
table.update_from_trans(trans)
# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bsuid]`` ?
pp = table.pps.get(bsuid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bsuid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
pp = table.pps[bsuid]
if msg.size != pp.size:
log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n'
@ -750,11 +730,8 @@ async def emit_pp_update(
_statuses: dict[str, str] = {
'cancelled': 'canceled',
'submitted': 'open',
# XXX: just pass these through? it duplicates actual fill events other
# then the case where you the `.remaining == 0` case which is our
# 'closed'` case.
# 'filled': 'pending',
# 'pendingsubmit': 'pending',
'pendingsubmit': 'pending',
'filled': 'fill',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
@ -828,12 +805,8 @@ async def deliver_trade_events(
# cancelling.. gawwwd
if ib_status_key == 'cancelled':
last_log = trade.log[-1]
if (
last_log.message
and 'Error' not in last_log.message
):
if last_log.message:
ib_status_key = trade.log[-2].status
print(ib_status_key)
elif ib_status_key == 'inactive':
async def sched_cancel():
@ -848,16 +821,10 @@ async def deliver_trade_events(
nurse.start_soon(sched_cancel)
status_key = (
_statuses.get(ib_status_key)
or ib_status_key.lower()
)
status_key = _statuses.get(ib_status_key) or ib_status_key
remaining = status.remaining
if (
status_key == 'filled'
and remaining == 0
):
if remaining == 0:
status_key = 'closed'
# skip duplicate filled updates - we get the deats
@ -1011,18 +978,9 @@ async def deliver_trade_events(
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: we don't want to relay data feed / lookup errors
# so we need some further filtering logic here..
# for most cases the 'status' block above should take
# care of this.
# await ems_stream.send(BrokerdStatus(
# status='error',
# reqid=err['reqid'],
# reason=err['reason'],
# time_ns=time.time_ns(),
# account=accounts_def.inverse[trade.order.account],
# broker_details={'name': 'ib'},
# ))
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
case 'position':

View File

@ -101,30 +101,3 @@ def percent_change(
new: float,
) -> float:
return pnl(init, new) * 100.
def diff_dict(
d1: dict,
d2: dict,
) -> dict:
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
shared_keys = d1_keys.intersection(d2_keys)
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
added_keys = d2_keys - d1_keys
added_deltas = {o: (None, d2[o]) for o in added_keys}
deltas = {**shared_deltas, **added_deltas}
return parse_deltas(deltas)
def parse_deltas(deltas: dict) -> dict:
res = {}
for k, v in deltas.items():
if isinstance(v[0], dict):
tmp = diff_dict(v[0], v[1])
if tmp:
res[k] = tmp
else:
res[k] = v[1]
return res

View File

@ -83,13 +83,7 @@ class OrderBook:
"""Cancel an order (or alert) in the EMS.
"""
cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
f'Unknown order {uuid}!?\n'
f'Maybe there is a stale entry or line?\n'
f'You should report this as a bug!'
)
cmd = self._sent_orders[uuid]
msg = Cancel(
oid=uuid,
symbol=cmd.symbol,
@ -162,10 +156,7 @@ async def relay_order_cmds_from_sync_code(
# send msg over IPC / wire
await to_ems_stream.send(cmd)
else:
log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
f'\n{msg}'
)
log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}')
@acm

View File

@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events(
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
# status = book._active.get(oid)
status_msg = book._active[oid]
req = status_msg.req
status = book._active[oid]
req = status.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
# and tell broker to cancel immediately
status_msg.reqid = reqid
status.reqid = reqid
await brokerd_trades_stream.send(req)
# 2. the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# TODO: should we relay this ack state?
status_msg.resp = 'pending'
status.resp = 'pending'
# no msg to client necessary
continue
@ -729,7 +729,6 @@ async def translate_and_relay_brokerd_events(
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status
# retrieve existing live flow
@ -747,47 +746,16 @@ async def translate_and_relay_brokerd_events(
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
# only if we already rxed a fill then probably
# this clear is fully complete? (frickin ib..)
if old_resp == 'fill':
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
log.info(f'Cancellation for {oid} is complete!')
else: # open
# relayed from backend but probably not handled so
# just log it
log.info(f'{broker} opened order {msg}')
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
ems_client_order_stream = router.dialogues[oid]
# wtf a fill can come after 'closed' from ib?
status_msg = book._active[oid]
# only if we already rxed a 'closed'
# this clear is fully complete? (frickin ib..)
# if status_msg.resp == 'closed':
# status_msg = book._active.pop(oid)
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
# ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
@ -843,14 +811,6 @@ async def translate_and_relay_brokerd_events(
# don't fall through
continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
@ -863,20 +823,32 @@ async def translate_and_relay_brokerd_events(
'status': status,
'reqid': reqid,
}:
status_msg = book._active[oid]
log.warning(
'Unhandled broker status for dialog:\n'
f'{pformat(status_msg)}\n'
'Unhandled broker status:\n'
f'{pformat(brokerd_msg)}\n'
)
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg)
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?

View File

@ -234,7 +234,6 @@ class BrokerdStatus(Struct):
'canceled',
'fill',
'pending',
'error',
]
account: str

View File

@ -37,7 +37,7 @@ import time
from math import isnan
from bidict import bidict
from msgspec.msgpack import encode, decode
import msgpack
import pyqtgraph as pg
import numpy as np
import tractor
@ -774,13 +774,12 @@ async def stream_quotes(
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
resp = await ws.send_message(
encode({'streams': list(tbks.values())})
msgpack.dumps({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> dict[str, Any]:
return decode((await ws.get_message()), encoding='utf-8')
return msgpack.loads((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")

View File

@ -18,7 +18,6 @@
Built-in (extension) types.
"""
import sys
from typing import Optional
from pprint import pformat
@ -43,13 +42,8 @@ class Struct(
}
def __repr__(self):
# only turn on pprint when we detect a python REPL
# at runtime B)
if hasattr(sys, 'ps1'):
return f'Struct({pformat(self.to_dict())})'
return super().__repr__()
def copy(
self,
update: Optional[dict] = None,

View File

@ -134,8 +134,6 @@ class Position(Struct):
# unique backend symbol id
bsuid: str
split_ratio: Optional[int] = None
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
@ -161,9 +159,6 @@ class Position(Struct):
clears = d.pop('clears')
expiry = d.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
@ -389,22 +384,12 @@ class Position(Struct):
asize_h.append(accum_size)
ppu_h.append(ppu_h[-1])
final_ppu = ppu_h[-1] if ppu_h else 0
# handle any split info entered (for now) manually by user
if self.split_ratio is not None:
final_ppu /= self.split_ratio
return final_ppu
return ppu_h[-1] if ppu_h else 0
def calc_size(self) -> float:
size: float = 0
for tid, entry in self.clears.items():
size += entry['size']
if self.split_ratio is not None:
size = round(size * self.split_ratio)
return size
def minimize_clears(
@ -863,7 +848,6 @@ def open_pps(
size = entry['size']
# TODO: remove but, handle old field name for now
ppu = entry.get('ppu', entry.get('be_price', 0))
split_ratio = entry.get('split_ratio')
expiry = entry.get('expiry')
if expiry:
@ -873,7 +857,6 @@ def open_pps(
Symbol.from_fqsn(fqsn, info={}),
size=size,
ppu=ppu,
split_ratio=split_ratio,
expiry=expiry,
bsuid=entry['bsuid'],

View File

@ -163,6 +163,7 @@ class OrderMode:
) -> LevelLine:
level = order.price
print(f'SIZE: {order.size}')
line = order_line(
self.chart,
@ -858,7 +859,7 @@ async def process_trade_msg(
get_index = mode.chart.get_index
fmsg = pformat(msg)
log.debug(f'Received order msg:\n{fmsg}')
log.info(f'Received order msg:\n{fmsg}')
name = msg['name']
if name in (
@ -919,7 +920,6 @@ async def process_trade_msg(
):
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'):
# delete level line from view
@ -932,15 +932,16 @@ async def process_trade_msg(
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
req = Order(**msg.req)
log.cancel(f'Canceled {req.action}:{oid}')
req = msg.req
log.cancel(
f'Canceled order {oid}:\n{pformat(req)}'
)
case Status(
resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
):
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmsg}')
case Status(
@ -950,14 +951,13 @@ async def process_trade_msg(
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
req = Order(**req)
mode.on_fill(
oid,
price=req.price,
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
msg.req = req
msg.req = Order(**req)
await mode.on_exec(oid, msg)
# response to completed 'dialog' for order request
@ -966,18 +966,18 @@ async def process_trade_msg(
# req=Order() as req, # TODO
req=req,
):
# right now this is just triggering a system alert
msg.req = Order(**req)
await mode.on_exec(oid, msg)
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
case Status(resp='fill'):
# handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
return
# continue
action = known_order.action
details = msg.brokerd_msg