Compare commits
10 Commits
b52c4092f3
...
2f6e3ad03f
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 2f6e3ad03f | |
Tyler Goodlet | b75683879a | |
Tyler Goodlet | db8a3dd1b7 | |
Tyler Goodlet | 2d92ed2052 | |
Tyler Goodlet | 0756cb0289 | |
Tyler Goodlet | 66f7dd9020 | |
Tyler Goodlet | 9782107153 | |
Tyler Goodlet | 1f43f660fe | |
Tyler Goodlet | d3b7d0e247 | |
Tyler Goodlet | 700dbf0e2b |
|
@ -36,8 +36,6 @@ from trio_typing import TaskStatus
|
|||
import tractor
|
||||
from ib_insync.contract import (
|
||||
Contract,
|
||||
# Option,
|
||||
# Forex,
|
||||
)
|
||||
from ib_insync.order import (
|
||||
Trade,
|
||||
|
@ -364,11 +362,24 @@ async def update_and_audit_msgs(
|
|||
# presume we're at least not more in the shit then we
|
||||
# thought.
|
||||
if diff:
|
||||
reverse_split_ratio = pikersize / ibsize
|
||||
split_ratio = 1/reverse_split_ratio
|
||||
|
||||
if split_ratio >= reverse_split_ratio:
|
||||
entry = f'split_ratio = {int(split_ratio)}'
|
||||
else:
|
||||
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
||||
|
||||
raise ValueError(
|
||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||
f'ib: {ibppmsg}\n'
|
||||
f'piker: {msg}\n'
|
||||
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
|
||||
f'reverse_split_ratio: {reverse_split_ratio}\n'
|
||||
f'split_ratio: {split_ratio}\n\n'
|
||||
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
|
||||
'If you are expecting a (reverse) split in this '
|
||||
'instrument you should probably put the following '
|
||||
f'in the `pps.toml` section:\n{entry}'
|
||||
)
|
||||
msg.size = ibsize
|
||||
|
||||
|
@ -532,6 +543,7 @@ async def trades_dialogue(
|
|||
# sure know which positions to update from the ledger if
|
||||
# any are missing from the ``pps.toml``
|
||||
bsuid, msg = pack_position(pos)
|
||||
|
||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||
acctid = acctid.strip('ib.')
|
||||
cids2pps[(acctid, bsuid)] = msg
|
||||
|
@ -571,14 +583,22 @@ async def trades_dialogue(
|
|||
trans = trans_by_acct.get(acctid)
|
||||
if trans:
|
||||
table.update_from_trans(trans)
|
||||
table.update_from_trans(trans)
|
||||
|
||||
# XXX: not sure exactly why it wouldn't be in
|
||||
# the updated output (maybe this is a bug?) but
|
||||
# if you create a pos from TWS and then load it
|
||||
# from the api trades it seems we get a key
|
||||
# error from ``update[bsuid]`` ?
|
||||
pp = table.pps[bsuid]
|
||||
pp = table.pps.get(bsuid)
|
||||
if not pp:
|
||||
log.error(
|
||||
f'The contract id for {msg} may have '
|
||||
f'changed to {bsuid}\nYou may need to '
|
||||
'adjust your ledger for this, skipping '
|
||||
'for now.'
|
||||
)
|
||||
continue
|
||||
|
||||
if msg.size != pp.size:
|
||||
log.error(
|
||||
'Position mismatch {pp.symbol.front_fqsn()}:\n'
|
||||
|
@ -730,8 +750,11 @@ async def emit_pp_update(
|
|||
_statuses: dict[str, str] = {
|
||||
'cancelled': 'canceled',
|
||||
'submitted': 'open',
|
||||
'pendingsubmit': 'pending',
|
||||
'filled': 'fill',
|
||||
# XXX: just pass these through? it duplicates actual fill events other
|
||||
# then the case where you the `.remaining == 0` case which is our
|
||||
# 'closed'` case.
|
||||
# 'filled': 'pending',
|
||||
# 'pendingsubmit': 'pending',
|
||||
|
||||
# TODO: see a current ``ib_insync`` issue around this:
|
||||
# https://github.com/erdewit/ib_insync/issues/363
|
||||
|
@ -805,8 +828,12 @@ async def deliver_trade_events(
|
|||
# cancelling.. gawwwd
|
||||
if ib_status_key == 'cancelled':
|
||||
last_log = trade.log[-1]
|
||||
if last_log.message:
|
||||
if (
|
||||
last_log.message
|
||||
and 'Error' not in last_log.message
|
||||
):
|
||||
ib_status_key = trade.log[-2].status
|
||||
print(ib_status_key)
|
||||
|
||||
elif ib_status_key == 'inactive':
|
||||
async def sched_cancel():
|
||||
|
@ -821,10 +848,16 @@ async def deliver_trade_events(
|
|||
|
||||
nurse.start_soon(sched_cancel)
|
||||
|
||||
status_key = _statuses.get(ib_status_key) or ib_status_key
|
||||
status_key = (
|
||||
_statuses.get(ib_status_key)
|
||||
or ib_status_key.lower()
|
||||
)
|
||||
|
||||
remaining = status.remaining
|
||||
if remaining == 0:
|
||||
if (
|
||||
status_key == 'filled'
|
||||
and remaining == 0
|
||||
):
|
||||
status_key = 'closed'
|
||||
|
||||
# skip duplicate filled updates - we get the deats
|
||||
|
@ -978,9 +1011,18 @@ async def deliver_trade_events(
|
|||
if err['reqid'] == -1:
|
||||
log.error(f'TWS external order error:\n{pformat(err)}')
|
||||
|
||||
# TODO: what schema for this msg if we're going to make it
|
||||
# portable across all backends?
|
||||
# msg = BrokerdError(**err)
|
||||
# TODO: we don't want to relay data feed / lookup errors
|
||||
# so we need some further filtering logic here..
|
||||
# for most cases the 'status' block above should take
|
||||
# care of this.
|
||||
# await ems_stream.send(BrokerdStatus(
|
||||
# status='error',
|
||||
# reqid=err['reqid'],
|
||||
# reason=err['reason'],
|
||||
# time_ns=time.time_ns(),
|
||||
# account=accounts_def.inverse[trade.order.account],
|
||||
# broker_details={'name': 'ib'},
|
||||
# ))
|
||||
|
||||
case 'position':
|
||||
|
||||
|
|
|
@ -101,3 +101,30 @@ def percent_change(
|
|||
new: float,
|
||||
) -> float:
|
||||
return pnl(init, new) * 100.
|
||||
|
||||
|
||||
def diff_dict(
|
||||
d1: dict,
|
||||
d2: dict,
|
||||
|
||||
) -> dict:
|
||||
d1_keys = set(d1.keys())
|
||||
d2_keys = set(d2.keys())
|
||||
shared_keys = d1_keys.intersection(d2_keys)
|
||||
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
|
||||
added_keys = d2_keys - d1_keys
|
||||
added_deltas = {o: (None, d2[o]) for o in added_keys}
|
||||
deltas = {**shared_deltas, **added_deltas}
|
||||
return parse_deltas(deltas)
|
||||
|
||||
|
||||
def parse_deltas(deltas: dict) -> dict:
|
||||
res = {}
|
||||
for k, v in deltas.items():
|
||||
if isinstance(v[0], dict):
|
||||
tmp = diff_dict(v[0], v[1])
|
||||
if tmp:
|
||||
res[k] = tmp
|
||||
else:
|
||||
res[k] = v[1]
|
||||
return res
|
||||
|
|
|
@ -83,7 +83,13 @@ class OrderBook:
|
|||
"""Cancel an order (or alert) in the EMS.
|
||||
|
||||
"""
|
||||
cmd = self._sent_orders[uuid]
|
||||
cmd = self._sent_orders.get(uuid)
|
||||
if not cmd:
|
||||
log.error(
|
||||
f'Unknown order {uuid}!?\n'
|
||||
f'Maybe there is a stale entry or line?\n'
|
||||
f'You should report this as a bug!'
|
||||
)
|
||||
msg = Cancel(
|
||||
oid=uuid,
|
||||
symbol=cmd.symbol,
|
||||
|
@ -156,7 +162,10 @@ async def relay_order_cmds_from_sync_code(
|
|||
# send msg over IPC / wire
|
||||
await to_ems_stream.send(cmd)
|
||||
else:
|
||||
log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}')
|
||||
log.warning(
|
||||
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
|
||||
f'\n{msg}'
|
||||
)
|
||||
|
||||
|
||||
@acm
|
||||
|
|
|
@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events(
|
|||
# received this ack, in which case we relay that cancel
|
||||
# signal **asap** to the backend broker
|
||||
# status = book._active.get(oid)
|
||||
status = book._active[oid]
|
||||
req = status.req
|
||||
status_msg = book._active[oid]
|
||||
req = status_msg.req
|
||||
if req and req.action == 'cancel':
|
||||
# assign newly providerd broker backend request id
|
||||
# and tell broker to cancel immediately
|
||||
status.reqid = reqid
|
||||
status_msg.reqid = reqid
|
||||
await brokerd_trades_stream.send(req)
|
||||
|
||||
# 2. the order is now active and will be mirrored in
|
||||
# our book -> registered as live flow
|
||||
else:
|
||||
# TODO: should we relay this ack state?
|
||||
status.resp = 'pending'
|
||||
status_msg.resp = 'pending'
|
||||
|
||||
# no msg to client necessary
|
||||
continue
|
||||
|
@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events(
|
|||
# msg-chain/dialog.
|
||||
ems_client_order_stream = router.dialogues[oid]
|
||||
status_msg = book._active[oid]
|
||||
old_resp = status_msg.resp
|
||||
status_msg.resp = status
|
||||
|
||||
# retrieve existing live flow
|
||||
|
@ -746,16 +747,47 @@ async def translate_and_relay_brokerd_events(
|
|||
|
||||
if status == 'closed':
|
||||
log.info(f'Execution for {oid} is complete!')
|
||||
status_msg = book._active.pop(oid)
|
||||
|
||||
# only if we already rxed a fill then probably
|
||||
# this clear is fully complete? (frickin ib..)
|
||||
if old_resp == 'fill':
|
||||
status_msg = book._active.pop(oid)
|
||||
|
||||
elif status == 'canceled':
|
||||
log.info(f'Cancellation for {oid} is complete!')
|
||||
log.cancel(f'Cancellation for {oid} is complete!')
|
||||
|
||||
else: # open
|
||||
# relayed from backend but probably not handled so
|
||||
# just log it
|
||||
log.info(f'{broker} opened order {msg}')
|
||||
|
||||
# BrokerdFill
|
||||
case {
|
||||
'name': 'fill',
|
||||
'reqid': reqid, # brokerd generated order-request id
|
||||
# 'symbol': sym, # paper engine doesn't have this, nbd?
|
||||
} if (
|
||||
oid := book._ems2brokerd_ids.inverse.get(reqid)
|
||||
):
|
||||
# proxy through the "fill" result(s)
|
||||
msg = BrokerdFill(**brokerd_msg)
|
||||
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
|
||||
|
||||
ems_client_order_stream = router.dialogues[oid]
|
||||
|
||||
# wtf a fill can come after 'closed' from ib?
|
||||
status_msg = book._active[oid]
|
||||
|
||||
# only if we already rxed a 'closed'
|
||||
# this clear is fully complete? (frickin ib..)
|
||||
# if status_msg.resp == 'closed':
|
||||
# status_msg = book._active.pop(oid)
|
||||
|
||||
status_msg.resp = 'fill'
|
||||
status_msg.reqid = reqid
|
||||
status_msg.brokerd_msg = msg
|
||||
await ems_client_order_stream.send(status_msg)
|
||||
|
||||
# ``Status`` containing an embedded order msg which
|
||||
# should be loaded as a "pre-existing open order" from the
|
||||
# brokerd backend.
|
||||
|
@ -811,6 +843,14 @@ async def translate_and_relay_brokerd_events(
|
|||
# don't fall through
|
||||
continue
|
||||
|
||||
# brokerd error
|
||||
case {
|
||||
'name': 'status',
|
||||
'status': 'error',
|
||||
}:
|
||||
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
|
||||
# XXX: we presume the brokerd cancels its own order
|
||||
|
||||
# TOO FAST ``BrokerdStatus`` that arrives
|
||||
# before the ``BrokerdAck``.
|
||||
case {
|
||||
|
@ -823,32 +863,20 @@ async def translate_and_relay_brokerd_events(
|
|||
'status': status,
|
||||
'reqid': reqid,
|
||||
}:
|
||||
status_msg = book._active[oid]
|
||||
log.warning(
|
||||
'Unhandled broker status:\n'
|
||||
'Unhandled broker status for dialog:\n'
|
||||
f'{pformat(status_msg)}\n'
|
||||
f'{pformat(brokerd_msg)}\n'
|
||||
)
|
||||
|
||||
# BrokerdFill
|
||||
case {
|
||||
'name': 'fill',
|
||||
'reqid': reqid, # brokerd generated order-request id
|
||||
# 'symbol': sym, # paper engine doesn't have this, nbd?
|
||||
} if (
|
||||
oid := book._ems2brokerd_ids.inverse.get(reqid)
|
||||
):
|
||||
# proxy through the "fill" result(s)
|
||||
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
|
||||
msg = BrokerdFill(**brokerd_msg)
|
||||
ems_client_order_stream = router.dialogues[oid]
|
||||
status_msg = book._active[oid]
|
||||
status_msg.resp = 'fill'
|
||||
status_msg.reqid = reqid
|
||||
status_msg.brokerd_msg = msg
|
||||
await ems_client_order_stream.send(status_msg)
|
||||
|
||||
case _:
|
||||
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
|
||||
|
||||
# XXX: ugh sometimes we don't access it?
|
||||
if status_msg:
|
||||
del status_msg
|
||||
|
||||
# TODO: do we want this to keep things cleaned up?
|
||||
# it might require a special status from brokerd to affirm the
|
||||
# flow is complete?
|
||||
|
|
|
@ -234,6 +234,7 @@ class BrokerdStatus(Struct):
|
|||
'canceled',
|
||||
'fill',
|
||||
'pending',
|
||||
'error',
|
||||
]
|
||||
|
||||
account: str
|
||||
|
|
|
@ -37,7 +37,7 @@ import time
|
|||
from math import isnan
|
||||
|
||||
from bidict import bidict
|
||||
import msgpack
|
||||
from msgspec.msgpack import encode, decode
|
||||
import pyqtgraph as pg
|
||||
import numpy as np
|
||||
import tractor
|
||||
|
@ -774,12 +774,13 @@ async def stream_quotes(
|
|||
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
||||
# send subs topics to server
|
||||
resp = await ws.send_message(
|
||||
msgpack.dumps({'streams': list(tbks.values())})
|
||||
|
||||
encode({'streams': list(tbks.values())})
|
||||
)
|
||||
log.info(resp)
|
||||
|
||||
async def recv() -> dict[str, Any]:
|
||||
return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
||||
return decode((await ws.get_message()), encoding='utf-8')
|
||||
|
||||
streams = (await recv())['streams']
|
||||
log.info(f"Subscribed to {streams}")
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
Built-in (extension) types.
|
||||
|
||||
"""
|
||||
import sys
|
||||
from typing import Optional
|
||||
from pprint import pformat
|
||||
|
||||
|
@ -42,7 +43,12 @@ class Struct(
|
|||
}
|
||||
|
||||
def __repr__(self):
|
||||
return f'Struct({pformat(self.to_dict())})'
|
||||
# only turn on pprint when we detect a python REPL
|
||||
# at runtime B)
|
||||
if hasattr(sys, 'ps1'):
|
||||
return f'Struct({pformat(self.to_dict())})'
|
||||
|
||||
return super().__repr__()
|
||||
|
||||
def copy(
|
||||
self,
|
||||
|
|
19
piker/pp.py
19
piker/pp.py
|
@ -134,6 +134,8 @@ class Position(Struct):
|
|||
# unique backend symbol id
|
||||
bsuid: str
|
||||
|
||||
split_ratio: Optional[int] = None
|
||||
|
||||
# ordered record of known constituent trade messages
|
||||
clears: dict[
|
||||
Union[str, int, Status], # trade id
|
||||
|
@ -159,6 +161,9 @@ class Position(Struct):
|
|||
clears = d.pop('clears')
|
||||
expiry = d.pop('expiry')
|
||||
|
||||
if self.split_ratio is None:
|
||||
d.pop('split_ratio')
|
||||
|
||||
# TODO: we need to figure out how to have one top level
|
||||
# listing venue here even when the backend isn't providing
|
||||
# it via the trades ledger..
|
||||
|
@ -384,12 +389,22 @@ class Position(Struct):
|
|||
asize_h.append(accum_size)
|
||||
ppu_h.append(ppu_h[-1])
|
||||
|
||||
return ppu_h[-1] if ppu_h else 0
|
||||
final_ppu = ppu_h[-1] if ppu_h else 0
|
||||
|
||||
# handle any split info entered (for now) manually by user
|
||||
if self.split_ratio is not None:
|
||||
final_ppu /= self.split_ratio
|
||||
|
||||
return final_ppu
|
||||
|
||||
def calc_size(self) -> float:
|
||||
size: float = 0
|
||||
for tid, entry in self.clears.items():
|
||||
size += entry['size']
|
||||
|
||||
if self.split_ratio is not None:
|
||||
size = round(size * self.split_ratio)
|
||||
|
||||
return size
|
||||
|
||||
def minimize_clears(
|
||||
|
@ -848,6 +863,7 @@ def open_pps(
|
|||
size = entry['size']
|
||||
# TODO: remove but, handle old field name for now
|
||||
ppu = entry.get('ppu', entry.get('be_price', 0))
|
||||
split_ratio = entry.get('split_ratio')
|
||||
|
||||
expiry = entry.get('expiry')
|
||||
if expiry:
|
||||
|
@ -857,6 +873,7 @@ def open_pps(
|
|||
Symbol.from_fqsn(fqsn, info={}),
|
||||
size=size,
|
||||
ppu=ppu,
|
||||
split_ratio=split_ratio,
|
||||
expiry=expiry,
|
||||
bsuid=entry['bsuid'],
|
||||
|
||||
|
|
|
@ -163,7 +163,6 @@ class OrderMode:
|
|||
) -> LevelLine:
|
||||
|
||||
level = order.price
|
||||
print(f'SIZE: {order.size}')
|
||||
line = order_line(
|
||||
|
||||
self.chart,
|
||||
|
@ -859,7 +858,7 @@ async def process_trade_msg(
|
|||
|
||||
get_index = mode.chart.get_index
|
||||
fmsg = pformat(msg)
|
||||
log.info(f'Received order msg:\n{fmsg}')
|
||||
log.debug(f'Received order msg:\n{fmsg}')
|
||||
name = msg['name']
|
||||
|
||||
if name in (
|
||||
|
@ -920,6 +919,7 @@ async def process_trade_msg(
|
|||
):
|
||||
dialog = mode.load_unknown_dialog_from_msg(msg)
|
||||
mode.on_submit(oid)
|
||||
# return dialog, msg
|
||||
|
||||
case Status(resp='error'):
|
||||
# delete level line from view
|
||||
|
@ -932,16 +932,15 @@ async def process_trade_msg(
|
|||
case Status(resp='canceled'):
|
||||
# delete level line from view
|
||||
mode.on_cancel(oid)
|
||||
req = msg.req
|
||||
log.cancel(
|
||||
f'Canceled order {oid}:\n{pformat(req)}'
|
||||
)
|
||||
req = Order(**msg.req)
|
||||
log.cancel(f'Canceled {req.action}:{oid}')
|
||||
|
||||
case Status(
|
||||
resp='triggered',
|
||||
# req=Order(exec_mode='dark') # TODO:
|
||||
req={'exec_mode': 'dark'},
|
||||
):
|
||||
# TODO: UX for a "pending" clear/live order
|
||||
log.info(f'Dark order triggered for {fmsg}')
|
||||
|
||||
case Status(
|
||||
|
@ -951,13 +950,14 @@ async def process_trade_msg(
|
|||
):
|
||||
# should only be one "fill" for an alert
|
||||
# add a triangle and remove the level line
|
||||
req = Order(**req)
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=req.price,
|
||||
arrow_index=get_index(time.time()),
|
||||
)
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
msg.req = Order(**req)
|
||||
msg.req = req
|
||||
await mode.on_exec(oid, msg)
|
||||
|
||||
# response to completed 'dialog' for order request
|
||||
|
@ -966,18 +966,18 @@ async def process_trade_msg(
|
|||
# req=Order() as req, # TODO
|
||||
req=req,
|
||||
):
|
||||
# right now this is just triggering a system alert
|
||||
msg.req = Order(**req)
|
||||
await mode.on_exec(oid, msg)
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
|
||||
# each clearing tick is responded individually
|
||||
case Status(resp='fill'):
|
||||
|
||||
# handle out-of-piker fills reporting?
|
||||
known_order = book._sent_orders.get(oid)
|
||||
if not known_order:
|
||||
log.warning(f'order {oid} is unknown')
|
||||
return
|
||||
# continue
|
||||
|
||||
action = known_order.action
|
||||
details = msg.brokerd_msg
|
||||
|
|
Loading…
Reference in New Issue