Port to new PpTable.dump_active()` output, move order event task to child nursery

ib_pps_upgrade
Tyler Goodlet 2022-07-26 12:06:53 -04:00
parent db5aacdb9c
commit 279c899de5
1 changed files with 54 additions and 56 deletions

View File

@ -72,9 +72,7 @@ from piker.clearing._messages import (
from piker.data._source import Symbol from piker.data._source import Symbol
from .api import ( from .api import (
_accounts2clients, _accounts2clients,
# _adhoc_futes_set,
con2fqsn, con2fqsn,
# _adhoc_symbol_map,
log, log,
get_config, get_config,
open_client_proxies, open_client_proxies,
@ -440,7 +438,7 @@ async def trades_dialogue(
# we might also want to delegate a specific actor for # we might also want to delegate a specific actor for
# ledger writing / reading for speed? # ledger writing / reading for speed?
async with ( async with (
trio.open_nursery() as nurse, # trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients), open_client_proxies() as (proxies, aioclients),
): ):
# Open a trade ledgers stack for appending trade records over # Open a trade ledgers stack for appending trade records over
@ -453,6 +451,8 @@ async def trades_dialogue(
): ):
for account, proxy in proxies.items(): for account, proxy in proxies.items():
assert account in accounts_def
accounts.add(account)
acctid = account.strip('ib.') acctid = account.strip('ib.')
acctids.add(acctid) acctids.add(acctid)
@ -467,46 +467,6 @@ async def trades_dialogue(
client = aioclients[account] client = aioclients[account]
async def open_trade_event_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
trades = await proxy.trades()
(
trans_by_acct,
api_ready_for_ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_ready_for_ledger_entries:
trade_entries = api_ready_for_ledger_entries[acctid]
ledger.update(trade_entries)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
# process pp value reported from ib's system. we only use these # process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses # to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in # the so called (bs) "FIFO" style which more or less results in
@ -534,13 +494,37 @@ async def trades_dialogue(
trans = norm_trade_records(ledger) trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans) updated = table.update_from_trans(trans)
pp = updated[bsuid] pp = updated[bsuid]
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
trades = await proxy.trades()
(
trans_by_acct,
api_to_ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
)
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_to_ledger_entries:
trade_entries = api_to_ledger_entries[acctid]
# write ledger with all new trades **AFTER**
# we've updated the `pps.toml` from the
# original ledger state! (i.e. this is
# currently done on exit)
ledger.update(trade_entries)
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
updated = table.update_from_trans(trans)
assert msg.size == pp.size, 'WTF' assert msg.size == pp.size, 'WTF'
# TODO: figure out why these don't match? active_pps, closed_pps = table.dump_active()
# assert pp.calc_be_price() == pp.be_price
_, closed_pps = table.dump_active('ib')
active_pps = table.pps
# load all positions from `pps.toml`, cross check with # load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as # ib's positions data, and relay re-formatted pps as
@ -571,15 +555,28 @@ async def trades_dialogue(
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
)) ))
# write ledger with all new trades **AFTER** we've updated the # proxy wrapper for starting trade event stream
# `pps.toml` from the original ledger state! async def open_trade_event_stream(
for acctid, trades_by_id in api_ready_for_ledger_entries.items(): task_status: TaskStatus[
ledgers[acctid].update(trades_by_id) trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
trade_event_stream = await n.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
# start order request handler **before** local trades # start order request handler **before** local trades
# event loop # event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def) n.start_soon(handle_order_requests, ems_stream, accounts_def)
@ -621,7 +618,7 @@ async def emit_pp_update(
acctid = acctid.strip('ib.') acctid = acctid.strip('ib.')
( (
records_by_acct, records_by_acct,
api_ready_for_ledger_entries, api_to_ledger_entries,
) = await update_ledger_from_api_trades( ) = await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
proxy, proxy,
@ -631,11 +628,10 @@ async def emit_pp_update(
table = tables[acctid] table = tables[acctid]
table.update_from_trans(trans) table.update_from_trans(trans)
_, closed = table.dump_active('ib') active, closed = table.dump_active()
active = table.pps
# NOTE: update ledger with all new trades # NOTE: update ledger with all new trades
for acctid, trades_by_id in api_ready_for_ledger_entries.items(): for acctid, trades_by_id in api_to_ledger_entries.items():
ledger = ledgers[acctid] ledger = ledgers[acctid]
ledger.update(trades_by_id) ledger.update(trades_by_id)
@ -655,6 +651,7 @@ async def emit_pp_update(
) )
if msgs: if msgs:
msg = msgs[0] msg = msgs[0]
log.info('Emitting pp msg: {msg}')
break break
await ems_stream.send(msg) await ems_stream.send(msg)
@ -876,6 +873,7 @@ async def deliver_trade_events(
case 'position': case 'position':
cid, msg = pack_position(item) cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg) # await ems_stream.send(msg)