Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 2f6e3ad03f Add dict differ helpers from SO answer 2022-08-11 16:18:05 -04:00
Tyler Goodlet b75683879a Only pprint our struct when we detect a py REPL 2022-08-11 15:56:28 -04:00
Tyler Goodlet db8a3dd1b7 Move fill case-block earlier, log broker errors 2022-08-11 14:26:34 -04:00
Tyler Goodlet 2d92ed2052 Drop `msgpack` from `marketstore` module 2022-08-11 14:21:01 -04:00
Tyler Goodlet 0756cb0289 Load boxed `.req` values as `Order`s in mode loop 2022-08-11 14:20:23 -04:00
Tyler Goodlet 66f7dd9020 'Only send `'closed'` on Filled events, lowercase all statues' 2022-08-11 14:18:53 -04:00
Tyler Goodlet 9782107153 First try mega-basic stock (reverse) split support with `ib` and `pps.toml` 2022-08-10 18:19:44 -04:00
Tyler Goodlet 1f43f660fe Passthrough filled and pendingsubmit cases 2022-08-10 18:03:25 -04:00
Tyler Goodlet d3b7d0e247 Include both symbols in error msg when a mismatch 2022-08-10 17:59:27 -04:00
Tyler Goodlet 700dbf0e2b Handle 'closed' vs. 'fill` race case..
`ib` is super good not being reliable with order event sequence order
and duplication of fill info. This adds some guards to try and avoid
popping the last status status too early if we end up receiving
a `'closed'` before the expected `'fill`' event(s). Further delete the
`status_msg` ref on each iteration to avoid stale reference lookups in
the relay task/loop.
2022-08-10 17:17:47 -04:00
9 changed files with 185 additions and 54 deletions

View File

@ -36,8 +36,6 @@ from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
# Option,
# Forex,
)
from ib_insync.order import (
Trade,
@ -364,11 +362,24 @@ async def update_and_audit_msgs(
# presume we're at least not more in the shit then we
# thought.
if diff:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = {int(split_ratio)}'
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following '
f'in the `pps.toml` section:\n{entry}'
)
msg.size = ibsize
@ -532,6 +543,7 @@ async def trades_dialogue(
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg
@ -571,14 +583,22 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
table.update_from_trans(trans)
# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bsuid]`` ?
pp = table.pps[bsuid]
pp = table.pps.get(bsuid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bsuid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
if msg.size != pp.size:
log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n'
@ -730,8 +750,11 @@ async def emit_pp_update(
_statuses: dict[str, str] = {
'cancelled': 'canceled',
'submitted': 'open',
'pendingsubmit': 'pending',
'filled': 'fill',
# XXX: just pass these through? it duplicates actual fill events other
# then the case where you the `.remaining == 0` case which is our
# 'closed'` case.
# 'filled': 'pending',
# 'pendingsubmit': 'pending',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
@ -805,8 +828,12 @@ async def deliver_trade_events(
# cancelling.. gawwwd
if ib_status_key == 'cancelled':
last_log = trade.log[-1]
if last_log.message:
if (
last_log.message
and 'Error' not in last_log.message
):
ib_status_key = trade.log[-2].status
print(ib_status_key)
elif ib_status_key == 'inactive':
async def sched_cancel():
@ -821,10 +848,16 @@ async def deliver_trade_events(
nurse.start_soon(sched_cancel)
status_key = _statuses.get(ib_status_key) or ib_status_key
status_key = (
_statuses.get(ib_status_key)
or ib_status_key.lower()
)
remaining = status.remaining
if remaining == 0:
if (
status_key == 'filled'
and remaining == 0
):
status_key = 'closed'
# skip duplicate filled updates - we get the deats
@ -978,9 +1011,18 @@ async def deliver_trade_events(
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
# TODO: we don't want to relay data feed / lookup errors
# so we need some further filtering logic here..
# for most cases the 'status' block above should take
# care of this.
# await ems_stream.send(BrokerdStatus(
# status='error',
# reqid=err['reqid'],
# reason=err['reason'],
# time_ns=time.time_ns(),
# account=accounts_def.inverse[trade.order.account],
# broker_details={'name': 'ib'},
# ))
case 'position':

View File

@ -101,3 +101,30 @@ def percent_change(
new: float,
) -> float:
return pnl(init, new) * 100.
def diff_dict(
d1: dict,
d2: dict,
) -> dict:
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
shared_keys = d1_keys.intersection(d2_keys)
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
added_keys = d2_keys - d1_keys
added_deltas = {o: (None, d2[o]) for o in added_keys}
deltas = {**shared_deltas, **added_deltas}
return parse_deltas(deltas)
def parse_deltas(deltas: dict) -> dict:
res = {}
for k, v in deltas.items():
if isinstance(v[0], dict):
tmp = diff_dict(v[0], v[1])
if tmp:
res[k] = tmp
else:
res[k] = v[1]
return res

View File

@ -83,7 +83,13 @@ class OrderBook:
"""Cancel an order (or alert) in the EMS.
"""
cmd = self._sent_orders[uuid]
cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
f'Unknown order {uuid}!?\n'
f'Maybe there is a stale entry or line?\n'
f'You should report this as a bug!'
)
msg = Cancel(
oid=uuid,
symbol=cmd.symbol,
@ -156,7 +162,10 @@ async def relay_order_cmds_from_sync_code(
# send msg over IPC / wire
await to_ems_stream.send(cmd)
else:
log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}')
log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
f'\n{msg}'
)
@acm

View File

@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events(
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
# status = book._active.get(oid)
status = book._active[oid]
req = status.req
status_msg = book._active[oid]
req = status_msg.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
# and tell broker to cancel immediately
status.reqid = reqid
status_msg.reqid = reqid
await brokerd_trades_stream.send(req)
# 2. the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# TODO: should we relay this ack state?
status.resp = 'pending'
status_msg.resp = 'pending'
# no msg to client necessary
continue
@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events(
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status
# retrieve existing live flow
@ -746,16 +747,47 @@ async def translate_and_relay_brokerd_events(
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
# only if we already rxed a fill then probably
# this clear is fully complete? (frickin ib..)
if old_resp == 'fill':
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.info(f'Cancellation for {oid} is complete!')
log.cancel(f'Cancellation for {oid} is complete!')
else: # open
# relayed from backend but probably not handled so
# just log it
log.info(f'{broker} opened order {msg}')
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
ems_client_order_stream = router.dialogues[oid]
# wtf a fill can come after 'closed' from ib?
status_msg = book._active[oid]
# only if we already rxed a 'closed'
# this clear is fully complete? (frickin ib..)
# if status_msg.resp == 'closed':
# status_msg = book._active.pop(oid)
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
# ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
@ -811,6 +843,14 @@ async def translate_and_relay_brokerd_events(
# don't fall through
continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
@ -823,32 +863,20 @@ async def translate_and_relay_brokerd_events(
'status': status,
'reqid': reqid,
}:
status_msg = book._active[oid]
log.warning(
'Unhandled broker status:\n'
'Unhandled broker status for dialog:\n'
f'{pformat(status_msg)}\n'
f'{pformat(brokerd_msg)}\n'
)
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg)
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?

View File

@ -234,6 +234,7 @@ class BrokerdStatus(Struct):
'canceled',
'fill',
'pending',
'error',
]
account: str

View File

@ -37,7 +37,7 @@ import time
from math import isnan
from bidict import bidict
import msgpack
from msgspec.msgpack import encode, decode
import pyqtgraph as pg
import numpy as np
import tractor
@ -774,12 +774,13 @@ async def stream_quotes(
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
encode({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
return decode((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")

View File

@ -18,6 +18,7 @@
Built-in (extension) types.
"""
import sys
from typing import Optional
from pprint import pformat
@ -42,8 +43,13 @@ class Struct(
}
def __repr__(self):
# only turn on pprint when we detect a python REPL
# at runtime B)
if hasattr(sys, 'ps1'):
return f'Struct({pformat(self.to_dict())})'
return super().__repr__()
def copy(
self,
update: Optional[dict] = None,

View File

@ -134,6 +134,8 @@ class Position(Struct):
# unique backend symbol id
bsuid: str
split_ratio: Optional[int] = None
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
@ -159,6 +161,9 @@ class Position(Struct):
clears = d.pop('clears')
expiry = d.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
@ -384,12 +389,22 @@ class Position(Struct):
asize_h.append(accum_size)
ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0
final_ppu = ppu_h[-1] if ppu_h else 0
# handle any split info entered (for now) manually by user
if self.split_ratio is not None:
final_ppu /= self.split_ratio
return final_ppu
def calc_size(self) -> float:
size: float = 0
for tid, entry in self.clears.items():
size += entry['size']
if self.split_ratio is not None:
size = round(size * self.split_ratio)
return size
def minimize_clears(
@ -848,6 +863,7 @@ def open_pps(
size = entry['size']
# TODO: remove but, handle old field name for now
ppu = entry.get('ppu', entry.get('be_price', 0))
split_ratio = entry.get('split_ratio')
expiry = entry.get('expiry')
if expiry:
@ -857,6 +873,7 @@ def open_pps(
Symbol.from_fqsn(fqsn, info={}),
size=size,
ppu=ppu,
split_ratio=split_ratio,
expiry=expiry,
bsuid=entry['bsuid'],

View File

@ -163,7 +163,6 @@ class OrderMode:
) -> LevelLine:
level = order.price
print(f'SIZE: {order.size}')
line = order_line(
self.chart,
@ -859,7 +858,7 @@ async def process_trade_msg(
get_index = mode.chart.get_index
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
log.debug(f'Received order msg:\n{fmsg}')
name = msg['name']
if name in (
@ -920,6 +919,7 @@ async def process_trade_msg(
):
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'):
# delete level line from view
@ -932,16 +932,15 @@ async def process_trade_msg(
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
req = msg.req
log.cancel(
f'Canceled order {oid}:\n{pformat(req)}'
)
req = Order(**msg.req)
log.cancel(f'Canceled {req.action}:{oid}')
case Status(
resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
):
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmsg}')
case Status(
@ -951,13 +950,14 @@ async def process_trade_msg(
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
req = Order(**req)
mode.on_fill(
oid,
price=req.price,
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
msg.req = Order(**req)
msg.req = req
await mode.on_exec(oid, msg)
# response to completed 'dialog' for order request
@ -966,18 +966,18 @@ async def process_trade_msg(
# req=Order() as req, # TODO
req=req,
):
# right now this is just triggering a system alert
msg.req = Order(**req)
await mode.on_exec(oid, msg)
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
case Status(resp='fill'):
# handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
return
# continue
action = known_order.action
details = msg.brokerd_msg