Port to new PpTable.dump_active()` output, move order event task to child nursery
parent
db5aacdb9c
commit
279c899de5
|
@ -72,9 +72,7 @@ from piker.clearing._messages import (
|
||||||
from piker.data._source import Symbol
|
from piker.data._source import Symbol
|
||||||
from .api import (
|
from .api import (
|
||||||
_accounts2clients,
|
_accounts2clients,
|
||||||
# _adhoc_futes_set,
|
|
||||||
con2fqsn,
|
con2fqsn,
|
||||||
# _adhoc_symbol_map,
|
|
||||||
log,
|
log,
|
||||||
get_config,
|
get_config,
|
||||||
open_client_proxies,
|
open_client_proxies,
|
||||||
|
@ -440,7 +438,7 @@ async def trades_dialogue(
|
||||||
# we might also want to delegate a specific actor for
|
# we might also want to delegate a specific actor for
|
||||||
# ledger writing / reading for speed?
|
# ledger writing / reading for speed?
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as nurse,
|
# trio.open_nursery() as nurse,
|
||||||
open_client_proxies() as (proxies, aioclients),
|
open_client_proxies() as (proxies, aioclients),
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
# Open a trade ledgers stack for appending trade records over
|
||||||
|
@ -453,6 +451,8 @@ async def trades_dialogue(
|
||||||
):
|
):
|
||||||
for account, proxy in proxies.items():
|
for account, proxy in proxies.items():
|
||||||
|
|
||||||
|
assert account in accounts_def
|
||||||
|
accounts.add(account)
|
||||||
acctid = account.strip('ib.')
|
acctid = account.strip('ib.')
|
||||||
acctids.add(acctid)
|
acctids.add(acctid)
|
||||||
|
|
||||||
|
@ -467,46 +467,6 @@ async def trades_dialogue(
|
||||||
|
|
||||||
client = aioclients[account]
|
client = aioclients[account]
|
||||||
|
|
||||||
async def open_trade_event_stream(
|
|
||||||
task_status: TaskStatus[
|
|
||||||
trio.abc.ReceiveChannel
|
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
|
||||||
):
|
|
||||||
# each api client has a unique event stream
|
|
||||||
async with tractor.to_asyncio.open_channel_from(
|
|
||||||
recv_trade_updates,
|
|
||||||
client=client,
|
|
||||||
) as (first, trade_event_stream):
|
|
||||||
|
|
||||||
task_status.started(trade_event_stream)
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
trade_event_stream = await nurse.start(open_trade_event_stream)
|
|
||||||
clients.append((client, trade_event_stream))
|
|
||||||
|
|
||||||
assert account in accounts_def
|
|
||||||
accounts.add(account)
|
|
||||||
|
|
||||||
# update trades ledgers for all accounts from connected
|
|
||||||
# api clients which report trades for **this session**.
|
|
||||||
trades = await proxy.trades()
|
|
||||||
(
|
|
||||||
trans_by_acct,
|
|
||||||
api_ready_for_ledger_entries,
|
|
||||||
) = await update_ledger_from_api_trades(
|
|
||||||
trades,
|
|
||||||
proxy,
|
|
||||||
)
|
|
||||||
|
|
||||||
# if new trades are detected from the API, prepare
|
|
||||||
# them for the ledger file and update the pptable.
|
|
||||||
if api_ready_for_ledger_entries:
|
|
||||||
trade_entries = api_ready_for_ledger_entries[acctid]
|
|
||||||
ledger.update(trade_entries)
|
|
||||||
trans = trans_by_acct.get(acctid)
|
|
||||||
if trans:
|
|
||||||
table.update_from_trans(trans)
|
|
||||||
|
|
||||||
# process pp value reported from ib's system. we only use these
|
# process pp value reported from ib's system. we only use these
|
||||||
# to cross-check sizing since average pricing on their end uses
|
# to cross-check sizing since average pricing on their end uses
|
||||||
# the so called (bs) "FIFO" style which more or less results in
|
# the so called (bs) "FIFO" style which more or less results in
|
||||||
|
@ -534,13 +494,37 @@ async def trades_dialogue(
|
||||||
trans = norm_trade_records(ledger)
|
trans = norm_trade_records(ledger)
|
||||||
updated = table.update_from_trans(trans)
|
updated = table.update_from_trans(trans)
|
||||||
pp = updated[bsuid]
|
pp = updated[bsuid]
|
||||||
|
|
||||||
|
# update trades ledgers for all accounts from connected
|
||||||
|
# api clients which report trades for **this session**.
|
||||||
|
trades = await proxy.trades()
|
||||||
|
(
|
||||||
|
trans_by_acct,
|
||||||
|
api_to_ledger_entries,
|
||||||
|
) = await update_ledger_from_api_trades(
|
||||||
|
trades,
|
||||||
|
proxy,
|
||||||
|
)
|
||||||
|
|
||||||
|
# if new trades are detected from the API, prepare
|
||||||
|
# them for the ledger file and update the pptable.
|
||||||
|
if api_to_ledger_entries:
|
||||||
|
trade_entries = api_to_ledger_entries[acctid]
|
||||||
|
|
||||||
|
# write ledger with all new trades **AFTER**
|
||||||
|
# we've updated the `pps.toml` from the
|
||||||
|
# original ledger state! (i.e. this is
|
||||||
|
# currently done on exit)
|
||||||
|
ledger.update(trade_entries)
|
||||||
|
|
||||||
|
trans = trans_by_acct.get(acctid)
|
||||||
|
if trans:
|
||||||
|
table.update_from_trans(trans)
|
||||||
|
updated = table.update_from_trans(trans)
|
||||||
|
|
||||||
assert msg.size == pp.size, 'WTF'
|
assert msg.size == pp.size, 'WTF'
|
||||||
|
|
||||||
# TODO: figure out why these don't match?
|
active_pps, closed_pps = table.dump_active()
|
||||||
# assert pp.calc_be_price() == pp.be_price
|
|
||||||
|
|
||||||
_, closed_pps = table.dump_active('ib')
|
|
||||||
active_pps = table.pps
|
|
||||||
|
|
||||||
# load all positions from `pps.toml`, cross check with
|
# load all positions from `pps.toml`, cross check with
|
||||||
# ib's positions data, and relay re-formatted pps as
|
# ib's positions data, and relay re-formatted pps as
|
||||||
|
@ -571,15 +555,28 @@ async def trades_dialogue(
|
||||||
tuple(name for name in accounts_def if name in accounts),
|
tuple(name for name in accounts_def if name in accounts),
|
||||||
))
|
))
|
||||||
|
|
||||||
# write ledger with all new trades **AFTER** we've updated the
|
# proxy wrapper for starting trade event stream
|
||||||
# `pps.toml` from the original ledger state!
|
async def open_trade_event_stream(
|
||||||
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
task_status: TaskStatus[
|
||||||
ledgers[acctid].update(trades_by_id)
|
trio.abc.ReceiveChannel
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
# each api client has a unique event stream
|
||||||
|
async with tractor.to_asyncio.open_channel_from(
|
||||||
|
recv_trade_updates,
|
||||||
|
client=client,
|
||||||
|
) as (first, trade_event_stream):
|
||||||
|
|
||||||
|
task_status.started(trade_event_stream)
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
|
trade_event_stream = await n.start(open_trade_event_stream)
|
||||||
|
clients.append((client, trade_event_stream))
|
||||||
|
|
||||||
# start order request handler **before** local trades
|
# start order request handler **before** local trades
|
||||||
# event loop
|
# event loop
|
||||||
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
n.start_soon(handle_order_requests, ems_stream, accounts_def)
|
||||||
|
@ -621,7 +618,7 @@ async def emit_pp_update(
|
||||||
acctid = acctid.strip('ib.')
|
acctid = acctid.strip('ib.')
|
||||||
(
|
(
|
||||||
records_by_acct,
|
records_by_acct,
|
||||||
api_ready_for_ledger_entries,
|
api_to_ledger_entries,
|
||||||
) = await update_ledger_from_api_trades(
|
) = await update_ledger_from_api_trades(
|
||||||
[trade_entry],
|
[trade_entry],
|
||||||
proxy,
|
proxy,
|
||||||
|
@ -631,11 +628,10 @@ async def emit_pp_update(
|
||||||
|
|
||||||
table = tables[acctid]
|
table = tables[acctid]
|
||||||
table.update_from_trans(trans)
|
table.update_from_trans(trans)
|
||||||
_, closed = table.dump_active('ib')
|
active, closed = table.dump_active()
|
||||||
active = table.pps
|
|
||||||
|
|
||||||
# NOTE: update ledger with all new trades
|
# NOTE: update ledger with all new trades
|
||||||
for acctid, trades_by_id in api_ready_for_ledger_entries.items():
|
for acctid, trades_by_id in api_to_ledger_entries.items():
|
||||||
ledger = ledgers[acctid]
|
ledger = ledgers[acctid]
|
||||||
ledger.update(trades_by_id)
|
ledger.update(trades_by_id)
|
||||||
|
|
||||||
|
@ -655,6 +651,7 @@ async def emit_pp_update(
|
||||||
)
|
)
|
||||||
if msgs:
|
if msgs:
|
||||||
msg = msgs[0]
|
msg = msgs[0]
|
||||||
|
log.info('Emitting pp msg: {msg}')
|
||||||
break
|
break
|
||||||
|
|
||||||
await ems_stream.send(msg)
|
await ems_stream.send(msg)
|
||||||
|
@ -876,6 +873,7 @@ async def deliver_trade_events(
|
||||||
case 'position':
|
case 'position':
|
||||||
|
|
||||||
cid, msg = pack_position(item)
|
cid, msg = pack_position(item)
|
||||||
|
log.info(f'New IB position msg: {msg}')
|
||||||
# acctid = msg.account = accounts_def.inverse[msg.account]
|
# acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
# cuck ib and it's shitty fifo sys for pps!
|
# cuck ib and it's shitty fifo sys for pps!
|
||||||
# await ems_stream.send(msg)
|
# await ems_stream.send(msg)
|
||||||
|
|
Loading…
Reference in New Issue