diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index acf055dc..6efdf7b1 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -72,9 +72,7 @@ from piker.clearing._messages import ( from piker.data._source import Symbol from .api import ( _accounts2clients, - # _adhoc_futes_set, con2fqsn, - # _adhoc_symbol_map, log, get_config, open_client_proxies, @@ -440,7 +438,7 @@ async def trades_dialogue( # we might also want to delegate a specific actor for # ledger writing / reading for speed? async with ( - trio.open_nursery() as nurse, + # trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): # Open a trade ledgers stack for appending trade records over @@ -453,6 +451,8 @@ async def trades_dialogue( ): for account, proxy in proxies.items(): + assert account in accounts_def + accounts.add(account) acctid = account.strip('ib.') acctids.add(acctid) @@ -467,46 +467,6 @@ async def trades_dialogue( client = aioclients[account] - async def open_trade_event_stream( - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - - trade_event_stream = await nurse.start(open_trade_event_stream) - clients.append((client, trade_event_stream)) - - assert account in accounts_def - accounts.add(account) - - # update trades ledgers for all accounts from connected - # api clients which report trades for **this session**. - trades = await proxy.trades() - ( - trans_by_acct, - api_ready_for_ledger_entries, - ) = await update_ledger_from_api_trades( - trades, - proxy, - ) - - # if new trades are detected from the API, prepare - # them for the ledger file and update the pptable. - if api_ready_for_ledger_entries: - trade_entries = api_ready_for_ledger_entries[acctid] - ledger.update(trade_entries) - trans = trans_by_acct.get(acctid) - if trans: - table.update_from_trans(trans) - # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses # the so called (bs) "FIFO" style which more or less results in @@ -534,13 +494,37 @@ async def trades_dialogue( trans = norm_trade_records(ledger) updated = table.update_from_trans(trans) pp = updated[bsuid] + + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. + trades = await proxy.trades() + ( + trans_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( + trades, + proxy, + ) + + # if new trades are detected from the API, prepare + # them for the ledger file and update the pptable. + if api_to_ledger_entries: + trade_entries = api_to_ledger_entries[acctid] + + # write ledger with all new trades **AFTER** + # we've updated the `pps.toml` from the + # original ledger state! (i.e. this is + # currently done on exit) + ledger.update(trade_entries) + + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) + updated = table.update_from_trans(trans) + assert msg.size == pp.size, 'WTF' - # TODO: figure out why these don't match? - # assert pp.calc_be_price() == pp.be_price - - _, closed_pps = table.dump_active('ib') - active_pps = table.pps + active_pps, closed_pps = table.dump_active() # load all positions from `pps.toml`, cross check with # ib's positions data, and relay re-formatted pps as @@ -571,15 +555,28 @@ async def trades_dialogue( tuple(name for name in accounts_def if name in accounts), )) - # write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in api_ready_for_ledger_entries.items(): - ledgers[acctid].update(trades_by_id) + # proxy wrapper for starting trade event stream + async def open_trade_event_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): + trade_event_stream = await n.start(open_trade_event_stream) + clients.append((client, trade_event_stream)) + # start order request handler **before** local trades # event loop n.start_soon(handle_order_requests, ems_stream, accounts_def) @@ -621,7 +618,7 @@ async def emit_pp_update( acctid = acctid.strip('ib.') ( records_by_acct, - api_ready_for_ledger_entries, + api_to_ledger_entries, ) = await update_ledger_from_api_trades( [trade_entry], proxy, @@ -631,11 +628,10 @@ async def emit_pp_update( table = tables[acctid] table.update_from_trans(trans) - _, closed = table.dump_active('ib') - active = table.pps + active, closed = table.dump_active() # NOTE: update ledger with all new trades - for acctid, trades_by_id in api_ready_for_ledger_entries.items(): + for acctid, trades_by_id in api_to_ledger_entries.items(): ledger = ledgers[acctid] ledger.update(trades_by_id) @@ -655,6 +651,7 @@ async def emit_pp_update( ) if msgs: msg = msgs[0] + log.info('Emitting pp msg: {msg}') break await ems_stream.send(msg) @@ -876,6 +873,7 @@ async def deliver_trade_events( case 'position': cid, msg = pack_position(item) + log.info(f'New IB position msg: {msg}') # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! # await ems_stream.send(msg)