Relay existing open orders from ib on startup
parent
9651ca84bf
commit
02980282cd
|
@ -42,6 +42,7 @@ from ib_insync.contract import (
|
||||||
from ib_insync.order import (
|
from ib_insync.order import (
|
||||||
Trade,
|
Trade,
|
||||||
OrderStatus,
|
OrderStatus,
|
||||||
|
Order,
|
||||||
)
|
)
|
||||||
from ib_insync.objects import (
|
from ib_insync.objects import (
|
||||||
Fill,
|
Fill,
|
||||||
|
@ -439,7 +440,6 @@ async def trades_dialogue(
|
||||||
# we might also want to delegate a specific actor for
|
# we might also want to delegate a specific actor for
|
||||||
# ledger writing / reading for speed?
|
# ledger writing / reading for speed?
|
||||||
async with (
|
async with (
|
||||||
# trio.open_nursery() as nurse,
|
|
||||||
open_client_proxies() as (proxies, aioclients),
|
open_client_proxies() as (proxies, aioclients),
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
# Open a trade ledgers stack for appending trade records over
|
||||||
|
@ -468,6 +468,41 @@ async def trades_dialogue(
|
||||||
|
|
||||||
client = aioclients[account]
|
client = aioclients[account]
|
||||||
|
|
||||||
|
trades: list[Trade] = client.ib.openTrades()
|
||||||
|
order_msgs = []
|
||||||
|
for trade in trades:
|
||||||
|
|
||||||
|
order = trade.order
|
||||||
|
quant = trade.order.totalQuantity
|
||||||
|
size = {
|
||||||
|
'SELL': -1,
|
||||||
|
'BUY': 1,
|
||||||
|
}[order.action] * quant
|
||||||
|
fqsn, _ = con2fqsn(trade.contract)
|
||||||
|
|
||||||
|
# TODO: maybe embed a ``BrokerdOrder`` instead
|
||||||
|
# since then we can directly load it on the client
|
||||||
|
# side in the order mode loop?
|
||||||
|
msg = BrokerdStatus(
|
||||||
|
reqid=order.orderId,
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
account=accounts_def.inverse[order.account],
|
||||||
|
status='submitted',
|
||||||
|
size=size,
|
||||||
|
price=order.lmtPrice,
|
||||||
|
filled=0,
|
||||||
|
reason='Existing live order',
|
||||||
|
|
||||||
|
# this seems to not be necessarily up to date in the
|
||||||
|
# execDetails event.. so we have to send it here I guess?
|
||||||
|
remaining=quant,
|
||||||
|
broker_details={
|
||||||
|
'name': 'ib',
|
||||||
|
'fqsn': fqsn,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
order_msgs.append(msg)
|
||||||
|
|
||||||
# process pp value reported from ib's system. we only use these
|
# process pp value reported from ib's system. we only use these
|
||||||
# to cross-check sizing since average pricing on their end uses
|
# to cross-check sizing since average pricing on their end uses
|
||||||
# the so called (bs) "FIFO" style which more or less results in
|
# the so called (bs) "FIFO" style which more or less results in
|
||||||
|
@ -523,7 +558,12 @@ async def trades_dialogue(
|
||||||
table.update_from_trans(trans)
|
table.update_from_trans(trans)
|
||||||
updated = table.update_from_trans(trans)
|
updated = table.update_from_trans(trans)
|
||||||
|
|
||||||
assert msg.size == pp.size, 'WTF'
|
if msg.size != pp.size:
|
||||||
|
log.error(
|
||||||
|
'Position mismatch {pp.symbol.front_fqsn()}:\n'
|
||||||
|
f'ib: {msg.size}\n'
|
||||||
|
f'piker: {pp.size}\n'
|
||||||
|
)
|
||||||
|
|
||||||
active_pps, closed_pps = table.dump_active()
|
active_pps, closed_pps = table.dump_active()
|
||||||
|
|
||||||
|
@ -575,6 +615,10 @@ async def trades_dialogue(
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
|
# relay existing open orders to ems
|
||||||
|
for msg in order_msgs:
|
||||||
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
trade_event_stream = await n.start(open_trade_event_stream)
|
trade_event_stream = await n.start(open_trade_event_stream)
|
||||||
clients.append((client, trade_event_stream))
|
clients.append((client, trade_event_stream))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue