Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 2f6e3ad03f Add dict differ helpers from SO answer 2022-08-11 16:18:05 -04:00
Tyler Goodlet b75683879a Only pprint our struct when we detect a py REPL 2022-08-11 15:56:28 -04:00
Tyler Goodlet db8a3dd1b7 Move fill case-block earlier, log broker errors 2022-08-11 14:26:34 -04:00
Tyler Goodlet 2d92ed2052 Drop `msgpack` from `marketstore` module 2022-08-11 14:21:01 -04:00
Tyler Goodlet 0756cb0289 Load boxed `.req` values as `Order`s in mode loop 2022-08-11 14:20:23 -04:00
Tyler Goodlet 66f7dd9020 'Only send `'closed'` on Filled events, lowercase all statues' 2022-08-11 14:18:53 -04:00
Tyler Goodlet 9782107153 First try mega-basic stock (reverse) split support with `ib` and `pps.toml` 2022-08-10 18:19:44 -04:00
Tyler Goodlet 1f43f660fe Passthrough filled and pendingsubmit cases 2022-08-10 18:03:25 -04:00
Tyler Goodlet d3b7d0e247 Include both symbols in error msg when a mismatch 2022-08-10 17:59:27 -04:00
Tyler Goodlet 700dbf0e2b Handle 'closed' vs. 'fill` race case..
`ib` is super good not being reliable with order event sequence order
and duplication of fill info. This adds some guards to try and avoid
popping the last status status too early if we end up receiving
a `'closed'` before the expected `'fill`' event(s). Further delete the
`status_msg` ref on each iteration to avoid stale reference lookups in
the relay task/loop.
2022-08-10 17:17:47 -04:00
9 changed files with 185 additions and 54 deletions

View File

@ -36,8 +36,6 @@ from trio_typing import TaskStatus
import tractor import tractor
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
# Option,
# Forex,
) )
from ib_insync.order import ( from ib_insync.order import (
Trade, Trade,
@ -364,11 +362,24 @@ async def update_and_audit_msgs(
# presume we're at least not more in the shit then we # presume we're at least not more in the shit then we
# thought. # thought.
if diff: if diff:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = {int(split_ratio)}'
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError( raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following '
f'in the `pps.toml` section:\n{entry}'
) )
msg.size = ibsize msg.size = ibsize
@ -532,6 +543,7 @@ async def trades_dialogue(
# sure know which positions to update from the ledger if # sure know which positions to update from the ledger if
# any are missing from the ``pps.toml`` # any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos) bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.') acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg cids2pps[(acctid, bsuid)] = msg
@ -571,14 +583,22 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid) trans = trans_by_acct.get(acctid)
if trans: if trans:
table.update_from_trans(trans) table.update_from_trans(trans)
table.update_from_trans(trans)
# XXX: not sure exactly why it wouldn't be in # XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but # the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it # if you create a pos from TWS and then load it
# from the api trades it seems we get a key # from the api trades it seems we get a key
# error from ``update[bsuid]`` ? # error from ``update[bsuid]`` ?
pp = table.pps[bsuid] pp = table.pps.get(bsuid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bsuid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
if msg.size != pp.size: if msg.size != pp.size:
log.error( log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n' 'Position mismatch {pp.symbol.front_fqsn()}:\n'
@ -730,8 +750,11 @@ async def emit_pp_update(
_statuses: dict[str, str] = { _statuses: dict[str, str] = {
'cancelled': 'canceled', 'cancelled': 'canceled',
'submitted': 'open', 'submitted': 'open',
'pendingsubmit': 'pending', # XXX: just pass these through? it duplicates actual fill events other
'filled': 'fill', # then the case where you the `.remaining == 0` case which is our
# 'closed'` case.
# 'filled': 'pending',
# 'pendingsubmit': 'pending',
# TODO: see a current ``ib_insync`` issue around this: # TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363 # https://github.com/erdewit/ib_insync/issues/363
@ -805,8 +828,12 @@ async def deliver_trade_events(
# cancelling.. gawwwd # cancelling.. gawwwd
if ib_status_key == 'cancelled': if ib_status_key == 'cancelled':
last_log = trade.log[-1] last_log = trade.log[-1]
if last_log.message: if (
last_log.message
and 'Error' not in last_log.message
):
ib_status_key = trade.log[-2].status ib_status_key = trade.log[-2].status
print(ib_status_key)
elif ib_status_key == 'inactive': elif ib_status_key == 'inactive':
async def sched_cancel(): async def sched_cancel():
@ -821,10 +848,16 @@ async def deliver_trade_events(
nurse.start_soon(sched_cancel) nurse.start_soon(sched_cancel)
status_key = _statuses.get(ib_status_key) or ib_status_key status_key = (
_statuses.get(ib_status_key)
or ib_status_key.lower()
)
remaining = status.remaining remaining = status.remaining
if remaining == 0: if (
status_key == 'filled'
and remaining == 0
):
status_key = 'closed' status_key = 'closed'
# skip duplicate filled updates - we get the deats # skip duplicate filled updates - we get the deats
@ -978,9 +1011,18 @@ async def deliver_trade_events(
if err['reqid'] == -1: if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}') log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: what schema for this msg if we're going to make it # TODO: we don't want to relay data feed / lookup errors
# portable across all backends? # so we need some further filtering logic here..
# msg = BrokerdError(**err) # for most cases the 'status' block above should take
# care of this.
# await ems_stream.send(BrokerdStatus(
# status='error',
# reqid=err['reqid'],
# reason=err['reason'],
# time_ns=time.time_ns(),
# account=accounts_def.inverse[trade.order.account],
# broker_details={'name': 'ib'},
# ))
case 'position': case 'position':

View File

@ -101,3 +101,30 @@ def percent_change(
new: float, new: float,
) -> float: ) -> float:
return pnl(init, new) * 100. return pnl(init, new) * 100.
def diff_dict(
d1: dict,
d2: dict,
) -> dict:
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
shared_keys = d1_keys.intersection(d2_keys)
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
added_keys = d2_keys - d1_keys
added_deltas = {o: (None, d2[o]) for o in added_keys}
deltas = {**shared_deltas, **added_deltas}
return parse_deltas(deltas)
def parse_deltas(deltas: dict) -> dict:
res = {}
for k, v in deltas.items():
if isinstance(v[0], dict):
tmp = diff_dict(v[0], v[1])
if tmp:
res[k] = tmp
else:
res[k] = v[1]
return res

View File

@ -83,7 +83,13 @@ class OrderBook:
"""Cancel an order (or alert) in the EMS. """Cancel an order (or alert) in the EMS.
""" """
cmd = self._sent_orders[uuid] cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
f'Unknown order {uuid}!?\n'
f'Maybe there is a stale entry or line?\n'
f'You should report this as a bug!'
)
msg = Cancel( msg = Cancel(
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
@ -156,7 +162,10 @@ async def relay_order_cmds_from_sync_code(
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)
else: else:
log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}') log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
f'\n{msg}'
)
@acm @acm

View File

@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events(
# received this ack, in which case we relay that cancel # received this ack, in which case we relay that cancel
# signal **asap** to the backend broker # signal **asap** to the backend broker
# status = book._active.get(oid) # status = book._active.get(oid)
status = book._active[oid] status_msg = book._active[oid]
req = status.req req = status_msg.req
if req and req.action == 'cancel': if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
# and tell broker to cancel immediately # and tell broker to cancel immediately
status.reqid = reqid status_msg.reqid = reqid
await brokerd_trades_stream.send(req) await brokerd_trades_stream.send(req)
# 2. the order is now active and will be mirrored in # 2. the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
else: else:
# TODO: should we relay this ack state? # TODO: should we relay this ack state?
status.resp = 'pending' status_msg.resp = 'pending'
# no msg to client necessary # no msg to client necessary
continue continue
@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events(
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid] status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status status_msg.resp = status
# retrieve existing live flow # retrieve existing live flow
@ -746,16 +747,47 @@ async def translate_and_relay_brokerd_events(
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
# only if we already rxed a fill then probably
# this clear is fully complete? (frickin ib..)
if old_resp == 'fill':
status_msg = book._active.pop(oid)
elif status == 'canceled': elif status == 'canceled':
log.info(f'Cancellation for {oid} is complete!') log.cancel(f'Cancellation for {oid} is complete!')
else: # open else: # open
# relayed from backend but probably not handled so # relayed from backend but probably not handled so
# just log it # just log it
log.info(f'{broker} opened order {msg}') log.info(f'{broker} opened order {msg}')
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
ems_client_order_stream = router.dialogues[oid]
# wtf a fill can come after 'closed' from ib?
status_msg = book._active[oid]
# only if we already rxed a 'closed'
# this clear is fully complete? (frickin ib..)
# if status_msg.resp == 'closed':
# status_msg = book._active.pop(oid)
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
# ``Status`` containing an embedded order msg which # ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the # should be loaded as a "pre-existing open order" from the
# brokerd backend. # brokerd backend.
@ -811,6 +843,14 @@ async def translate_and_relay_brokerd_events(
# don't fall through # don't fall through
continue continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives # TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``. # before the ``BrokerdAck``.
case { case {
@ -823,32 +863,20 @@ async def translate_and_relay_brokerd_events(
'status': status, 'status': status,
'reqid': reqid, 'reqid': reqid,
}: }:
status_msg = book._active[oid]
log.warning( log.warning(
'Unhandled broker status:\n' 'Unhandled broker status for dialog:\n'
f'{pformat(status_msg)}\n'
f'{pformat(brokerd_msg)}\n' f'{pformat(brokerd_msg)}\n'
) )
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg)
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
case _: case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
# flow is complete? # flow is complete?

View File

@ -234,6 +234,7 @@ class BrokerdStatus(Struct):
'canceled', 'canceled',
'fill', 'fill',
'pending', 'pending',
'error',
] ]
account: str account: str

View File

@ -37,7 +37,7 @@ import time
from math import isnan from math import isnan
from bidict import bidict from bidict import bidict
import msgpack from msgspec.msgpack import encode, decode
import pyqtgraph as pg import pyqtgraph as pg
import numpy as np import numpy as np
import tractor import tractor
@ -774,12 +774,13 @@ async def stream_quotes(
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server # send subs topics to server
resp = await ws.send_message( resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
encode({'streams': list(tbks.values())})
) )
log.info(resp) log.info(resp)
async def recv() -> dict[str, Any]: async def recv() -> dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8') return decode((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams'] streams = (await recv())['streams']
log.info(f"Subscribed to {streams}") log.info(f"Subscribed to {streams}")

View File

@ -18,6 +18,7 @@
Built-in (extension) types. Built-in (extension) types.
""" """
import sys
from typing import Optional from typing import Optional
from pprint import pformat from pprint import pformat
@ -42,7 +43,12 @@ class Struct(
} }
def __repr__(self): def __repr__(self):
return f'Struct({pformat(self.to_dict())})' # only turn on pprint when we detect a python REPL
# at runtime B)
if hasattr(sys, 'ps1'):
return f'Struct({pformat(self.to_dict())})'
return super().__repr__()
def copy( def copy(
self, self,

View File

@ -134,6 +134,8 @@ class Position(Struct):
# unique backend symbol id # unique backend symbol id
bsuid: str bsuid: str
split_ratio: Optional[int] = None
# ordered record of known constituent trade messages # ordered record of known constituent trade messages
clears: dict[ clears: dict[
Union[str, int, Status], # trade id Union[str, int, Status], # trade id
@ -159,6 +161,9 @@ class Position(Struct):
clears = d.pop('clears') clears = d.pop('clears')
expiry = d.pop('expiry') expiry = d.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
# TODO: we need to figure out how to have one top level # TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing # listing venue here even when the backend isn't providing
# it via the trades ledger.. # it via the trades ledger..
@ -384,12 +389,22 @@ class Position(Struct):
asize_h.append(accum_size) asize_h.append(accum_size)
ppu_h.append(ppu_h[-1]) ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0 final_ppu = ppu_h[-1] if ppu_h else 0
# handle any split info entered (for now) manually by user
if self.split_ratio is not None:
final_ppu /= self.split_ratio
return final_ppu
def calc_size(self) -> float: def calc_size(self) -> float:
size: float = 0 size: float = 0
for tid, entry in self.clears.items(): for tid, entry in self.clears.items():
size += entry['size'] size += entry['size']
if self.split_ratio is not None:
size = round(size * self.split_ratio)
return size return size
def minimize_clears( def minimize_clears(
@ -848,6 +863,7 @@ def open_pps(
size = entry['size'] size = entry['size']
# TODO: remove but, handle old field name for now # TODO: remove but, handle old field name for now
ppu = entry.get('ppu', entry.get('be_price', 0)) ppu = entry.get('ppu', entry.get('be_price', 0))
split_ratio = entry.get('split_ratio')
expiry = entry.get('expiry') expiry = entry.get('expiry')
if expiry: if expiry:
@ -857,6 +873,7 @@ def open_pps(
Symbol.from_fqsn(fqsn, info={}), Symbol.from_fqsn(fqsn, info={}),
size=size, size=size,
ppu=ppu, ppu=ppu,
split_ratio=split_ratio,
expiry=expiry, expiry=expiry,
bsuid=entry['bsuid'], bsuid=entry['bsuid'],

View File

@ -163,7 +163,6 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
level = order.price level = order.price
print(f'SIZE: {order.size}')
line = order_line( line = order_line(
self.chart, self.chart,
@ -859,7 +858,7 @@ async def process_trade_msg(
get_index = mode.chart.get_index get_index = mode.chart.get_index
fmsg = pformat(msg) fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}') log.debug(f'Received order msg:\n{fmsg}')
name = msg['name'] name = msg['name']
if name in ( if name in (
@ -920,6 +919,7 @@ async def process_trade_msg(
): ):
dialog = mode.load_unknown_dialog_from_msg(msg) dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid) mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'): case Status(resp='error'):
# delete level line from view # delete level line from view
@ -932,16 +932,15 @@ async def process_trade_msg(
case Status(resp='canceled'): case Status(resp='canceled'):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
req = msg.req req = Order(**msg.req)
log.cancel( log.cancel(f'Canceled {req.action}:{oid}')
f'Canceled order {oid}:\n{pformat(req)}'
)
case Status( case Status(
resp='triggered', resp='triggered',
# req=Order(exec_mode='dark') # TODO: # req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'}, req={'exec_mode': 'dark'},
): ):
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmsg}') log.info(f'Dark order triggered for {fmsg}')
case Status( case Status(
@ -951,13 +950,14 @@ async def process_trade_msg(
): ):
# should only be one "fill" for an alert # should only be one "fill" for an alert
# add a triangle and remove the level line # add a triangle and remove the level line
req = Order(**req)
mode.on_fill( mode.on_fill(
oid, oid,
price=req.price, price=req.price,
arrow_index=get_index(time.time()), arrow_index=get_index(time.time()),
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
msg.req = Order(**req) msg.req = req
await mode.on_exec(oid, msg) await mode.on_exec(oid, msg)
# response to completed 'dialog' for order request # response to completed 'dialog' for order request
@ -966,18 +966,18 @@ async def process_trade_msg(
# req=Order() as req, # TODO # req=Order() as req, # TODO
req=req, req=req,
): ):
# right now this is just triggering a system alert
msg.req = Order(**req) msg.req = Order(**req)
await mode.on_exec(oid, msg) await mode.on_exec(oid, msg)
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually # each clearing tick is responded individually
case Status(resp='fill'): case Status(resp='fill'):
# handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid) known_order = book._sent_orders.get(oid)
if not known_order: if not known_order:
log.warning(f'order {oid} is unknown') log.warning(f'order {oid} is unknown')
return return
# continue
action = known_order.action action = known_order.action
details = msg.brokerd_msg details = msg.brokerd_msg