Merge pull request #383 from pikers/doin_the_splits

Doin the splits
asycvnc_pin_bump
goodboy 2022-08-18 11:50:46 -04:00 committed by GitHub
commit 8bef67642e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 245 additions and 107 deletions

View File

@ -1127,6 +1127,12 @@ async def load_aio_clients(
# careful. # careful.
timeout=connect_timeout, timeout=connect_timeout,
) )
# create and cache client
client = Client(ib)
# update all actor-global caches
log.info(f"Caching client for {sockaddr}")
_client_cache[sockaddr] = client
break break
except ( except (
@ -1150,21 +1156,9 @@ async def load_aio_clients(
log.warning( log.warning(
f'Failed to connect on {port} for {i} time, retrying...') f'Failed to connect on {port} for {i} time, retrying...')
# create and cache client
client = Client(ib)
# Pre-collect all accounts available for this # Pre-collect all accounts available for this
# connection and map account names to this client # connection and map account names to this client
# instance. # instance.
pps = ib.positions()
if pps:
for pp in pps:
accounts_found[
accounts_def.inverse[pp.account]
] = client
# if there are accounts without positions we should still
# register them for this client
for value in ib.accountValues(): for value in ib.accountValues():
acct_number = value.account acct_number = value.account
@ -1185,10 +1179,6 @@ async def load_aio_clients(
f'{pformat(accounts_found)}' f'{pformat(accounts_found)}'
) )
# update all actor-global caches
log.info(f"Caching client for {sockaddr}")
_client_cache[sockaddr] = client
# XXX: why aren't we just updating this directy above # XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`? # instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
@ -1245,7 +1235,6 @@ async def open_client_proxies() -> tuple[
]: ]:
async with ( async with (
tractor.trionics.maybe_open_context( tractor.trionics.maybe_open_context(
# acm_func=open_client_proxies,
acm_func=tractor.to_asyncio.open_channel_from, acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio}, kwargs={'target': load_clients_for_trio},

View File

@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
""" """
from __future__ import annotations from __future__ import annotations
from bisect import insort
from contextlib import ExitStack from contextlib import ExitStack
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
@ -36,8 +37,6 @@ from trio_typing import TaskStatus
import tractor import tractor
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
# Option,
# Forex,
) )
from ib_insync.order import ( from ib_insync.order import (
Trade, Trade,
@ -357,11 +356,24 @@ async def update_and_audit_msgs(
# presume we're at least not more in the shit then we # presume we're at least not more in the shit then we
# thought. # thought.
if diff: if diff:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = {int(split_ratio)}'
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError( raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following '
f'in the `pps.toml` section:\n{entry}'
) )
msg.size = ibsize msg.size = ibsize
@ -394,11 +406,12 @@ async def update_and_audit_msgs(
avg_price=p.ppu, avg_price=p.ppu,
) )
if validate and p.size: if validate and p.size:
raise ValueError( # raise ValueError(
f'UNEXPECTED POSITION ib <-> piker ledger:\n' log.error(
f'UNEXPECTED POSITION says ib:\n'
f'piker: {msg}\n' f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n'
'MAYBE THEY LIQUIDATED YOU BRO!??!' 'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?'
) )
msgs.append(msg) msgs.append(msg)
@ -423,7 +436,6 @@ async def trades_dialogue(
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = [] all_positions = []
accounts = set() accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
acctids = set() acctids = set()
cids2pps: dict[str, BrokerdPosition] = {} cids2pps: dict[str, BrokerdPosition] = {}
@ -450,8 +462,8 @@ async def trades_dialogue(
with ( with (
ExitStack() as lstack, ExitStack() as lstack,
): ):
# load ledgers and pps for all detected client-proxies
for account, proxy in proxies.items(): for account, proxy in proxies.items():
assert account in accounts_def assert account in accounts_def
accounts.add(account) accounts.add(account)
acctid = account.strip('ib.') acctid = account.strip('ib.')
@ -466,6 +478,7 @@ async def trades_dialogue(
open_pps('ib', acctid) open_pps('ib', acctid)
) )
for account, proxy in proxies.items():
client = aioclients[account] client = aioclients[account]
# process pp value reported from ib's system. we only use these # process pp value reported from ib's system. we only use these
@ -473,28 +486,29 @@ async def trades_dialogue(
# the so called (bs) "FIFO" style which more or less results in # the so called (bs) "FIFO" style which more or less results in
# a price that's not useful for traders who want to not lose # a price that's not useful for traders who want to not lose
# money.. xb # money.. xb
# for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
# collect all ib-pp reported positions so that we can be # collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if # sure know which positions to update from the ledger if
# any are missing from the ``pps.toml`` # any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos) bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.') acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg cids2pps[(acctid, bsuid)] = msg
assert msg.account in accounts, ( assert msg.account in accounts, (
f'Position for unknown account: {msg.account}') f'Position for unknown account: {msg.account}')
ledger = ledgers[acctid]
table = tables[acctid] table = tables[acctid]
pp = table.pps.get(bsuid) pp = table.pps.get(bsuid)
if ( if (
not pp not pp
or pp.size != msg.size or pp.size != msg.size
): ):
trans = norm_trade_records(ledger) trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans) table.update_from_trans(trans)
pp = updated[bsuid]
# update trades ledgers for all accounts from connected # update trades ledgers for all accounts from connected
# api clients which report trades for **this session**. # api clients which report trades for **this session**.
@ -521,9 +535,34 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid) trans = trans_by_acct.get(acctid)
if trans: if trans:
table.update_from_trans(trans) table.update_from_trans(trans)
updated = table.update_from_trans(trans)
assert msg.size == pp.size, 'WTF' # XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bsuid]`` ?
pp = table.pps.get(bsuid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bsuid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bsuid]`` ?
pp = table.pps[bsuid]
if msg.size != pp.size:
log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n'
f'ib: {msg.size}\n'
f'piker: {pp.size}\n'
)
active_pps, closed_pps = table.dump_active() active_pps, closed_pps = table.dump_active()
@ -558,6 +597,7 @@ async def trades_dialogue(
# proxy wrapper for starting trade event stream # proxy wrapper for starting trade event stream
async def open_trade_event_stream( async def open_trade_event_stream(
client: Client,
task_status: TaskStatus[ task_status: TaskStatus[
trio.abc.ReceiveChannel trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
@ -575,18 +615,25 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
trade_event_stream = await n.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
# start order request handler **before** local trades for client in set(aioclients.values()):
# event loop trade_event_stream = await n.start(
n.start_soon(handle_order_requests, ems_stream, accounts_def) open_trade_event_stream,
client,
)
# allocate event relay tasks for each client connection # start order request handler **before** local trades
for client, stream in clients: # event loop
n.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
)
# allocate event relay tasks for each client connection
n.start_soon( n.start_soon(
deliver_trade_events, deliver_trade_events,
stream, trade_event_stream,
ems_stream, ems_stream,
accounts_def, accounts_def,
cids2pps, cids2pps,
@ -661,6 +708,22 @@ async def emit_pp_update(
await ems_stream.send(msg) await ems_stream.send(msg)
_statuses: dict[str, str] = {
'cancelled': 'canceled',
'submitted': 'open',
# XXX: just pass these through? it duplicates actual fill events other
# then the case where you the `.remaining == 0` case which is our
# 'closed'` case.
# 'filled': 'pending',
# 'pendingsubmit': 'pending',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
'inactive': 'pending',
}
async def deliver_trade_events( async def deliver_trade_events(
trade_event_stream: trio.MemoryReceiveChannel, trade_event_stream: trio.MemoryReceiveChannel,
@ -914,11 +977,13 @@ def norm_trade_records(
ledger into our standard record format. ledger into our standard record format.
''' '''
records: dict[str, Transaction] = {} records: list[Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
conid = record.get('conId') or record['conid'] conid = record.get('conId') or record['conid']
comms = record.get('commission') or -1*record['ibCommission'] comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
price = record.get('price') or record['tradePrice'] price = record.get('price') or record['tradePrice']
# the api doesn't do the -/+ on the quantity for you but flex # the api doesn't do the -/+ on the quantity for you but flex
@ -991,19 +1056,22 @@ def norm_trade_records(
# should already have entries if the pps are still open, in # should already have entries if the pps are still open, in
# which case, we can pull the fqsn from that table (see # which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above). # `trades_dialogue()` above).
insort(
records[tid] = Transaction( records,
fqsn=fqsn, Transaction(
tid=tid, fqsn=fqsn,
size=size, tid=tid,
price=price, size=size,
cost=comms, price=price,
dt=dt, cost=comms,
expiry=expiry, dt=dt,
bsuid=conid, expiry=expiry,
bsuid=conid,
),
key=lambda t: t.dt
) )
return records return {r.tid: r for r in records}
def trades_to_ledger_entries( def trades_to_ledger_entries(

View File

@ -134,11 +134,14 @@ class Position(Struct):
# unique backend symbol id # unique backend symbol id
bsuid: str bsuid: str
split_ratio: Optional[int] = None
# ordered record of known constituent trade messages # ordered record of known constituent trade messages
clears: dict[ clears: dict[
Union[str, int, Status], # trade id Union[str, int, Status], # trade id
dict[str, Any], # transaction history summaries dict[str, Any], # transaction history summaries
] = {} ] = {}
first_clear_dt: Optional[datetime] = None
expiry: Optional[datetime] = None expiry: Optional[datetime] = None
@ -159,6 +162,12 @@ class Position(Struct):
clears = d.pop('clears') clears = d.pop('clears')
expiry = d.pop('expiry') expiry = d.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
# should be obvious from clears/event table
d.pop('first_clear_dt')
# TODO: we need to figure out how to have one top level # TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing # listing venue here even when the backend isn't providing
# it via the trades ledger.. # it via the trades ledger..
@ -166,16 +175,14 @@ class Position(Struct):
s = d.pop('symbol') s = d.pop('symbol')
fqsn = s.front_fqsn() fqsn = s.front_fqsn()
size = d.pop('size')
ppu = d.pop('ppu')
d['size'], d['ppu'] = self.audit_sizing(size, ppu)
if self.expiry is None: if self.expiry is None:
d.pop('expiry', None) d.pop('expiry', None)
elif expiry: elif expiry:
d['expiry'] = str(expiry) d['expiry'] = str(expiry)
toml_clears_list = [] toml_clears_list = []
# reverse sort so latest clears are at top of section?
for tid, data in sorted( for tid, data in sorted(
list(clears.items()), list(clears.items()),
@ -184,7 +191,8 @@ class Position(Struct):
): ):
inline_table = toml.TomlDecoder().get_empty_inline_table() inline_table = toml.TomlDecoder().get_empty_inline_table()
inline_table['dt'] = data['dt'] # serialize datetime to parsable `str`
inline_table['dt'] = str(data['dt'])
# insert optional clear fields in column order # insert optional clear fields in column order
for k in ['ppu', 'accum_size']: for k in ['ppu', 'accum_size']:
@ -203,35 +211,51 @@ class Position(Struct):
return fqsn, d return fqsn, d
def audit_sizing( def ensure_state(self) -> None:
self,
size: Optional[float] = None,
ppu: Optional[float] = None,
) -> tuple[float, float]:
''' '''
Audit either the `.size` and `.ppu` values or equvialent Audit either the `.size` and `.ppu` local instance vars against
passed in values against the clears table calculations and the clears table calculations and return the calc-ed values if
return the calc-ed values if they differ and log warnings to they differ and log warnings to console.
console.
''' '''
size = size or self.size clears = list(self.clears.values())
ppu = ppu or self.ppu self.first_clear_dt = min(list(entry['dt'] for entry in clears))
last_clear = clears[-1]
csize = self.calc_size() csize = self.calc_size()
cppu = self.calc_ppu() accum = last_clear['accum_size']
if not self.expired():
if (
csize != accum
and csize != round(accum * self.split_ratio or 1)
):
raise ValueError(f'Size mismatch: {csize}')
else:
assert csize == 0, 'Contract is expired but non-zero size?'
if size != csize: if self.size != csize:
log.warning(f'size != calculated size: {size} != {csize}')
size = csize
if ppu != cppu:
log.warning( log.warning(
f'ppu != calculated ppu: {ppu} != {cppu}' 'Position state mismatch:\n'
f'{self.size} => {csize}'
) )
ppu = cppu self.size = csize
return size, ppu cppu = self.calc_ppu()
ppu = last_clear['ppu']
if (
cppu != ppu
and self.split_ratio is not None
# handle any split info entered (for now) manually by user
and cppu != (ppu / self.split_ratio)
):
raise ValueError(f'PPU mismatch: {cppu}')
if self.ppu != cppu:
log.warning(
'Position state mismatch:\n'
f'{self.ppu} => {cppu}'
)
self.ppu = cppu
def update_from_msg( def update_from_msg(
self, self,
@ -384,12 +408,40 @@ class Position(Struct):
asize_h.append(accum_size) asize_h.append(accum_size)
ppu_h.append(ppu_h[-1]) ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0 final_ppu = ppu_h[-1] if ppu_h else 0
# handle any split info entered (for now) manually by user
if self.split_ratio is not None:
final_ppu /= self.split_ratio
return final_ppu
def expired(self) -> bool:
'''
Predicate which checks if the contract/instrument is past its expiry.
'''
return bool(self.expiry) and self.expiry < now()
def calc_size(self) -> float: def calc_size(self) -> float:
'''
Calculate the unit size of this position in the destination
asset using the clears/trade event table; zero if expired.
'''
size: float = 0 size: float = 0
# time-expired pps (normally derivatives) are "closed"
# and have a zero size.
if self.expired():
return 0
for tid, entry in self.clears.items(): for tid, entry in self.clears.items():
size += entry['size'] size += entry['size']
if self.split_ratio is not None:
size = round(size * self.split_ratio)
return size return size
def minimize_clears( def minimize_clears(
@ -434,20 +486,24 @@ class Position(Struct):
'cost': t.cost, 'cost': t.cost,
'price': t.price, 'price': t.price,
'size': t.size, 'size': t.size,
'dt': str(t.dt), 'dt': t.dt,
} }
# TODO: compute these incrementally instead # TODO: compute these incrementally instead
# of re-looping through each time resulting in O(n**2) # of re-looping through each time resulting in O(n**2)
# behaviour.. # behaviour..?
# compute these **after** adding the entry
# in order to make the recurrence relation math work # NOTE: we compute these **after** adding the entry in order to
# inside ``.calc_size()``. # make the recurrence relation math work inside
# ``.calc_size()``.
self.size = clear['accum_size'] = self.calc_size() self.size = clear['accum_size'] = self.calc_size()
self.ppu = clear['ppu'] = self.calc_ppu() self.ppu = clear['ppu'] = self.calc_ppu()
return clear return clear
def sugest_split(self) -> float:
...
class PpTable(Struct): class PpTable(Struct):
@ -484,16 +540,22 @@ class PpTable(Struct):
expiry=t.expiry, expiry=t.expiry,
) )
) )
clears = pp.clears
if clears:
first_clear_dt = pp.first_clear_dt
# don't do updates for ledger records we already have # don't do updates for ledger records we already have
# included in the current pps state. # included in the current pps state.
if t.tid in pp.clears: if (
# NOTE: likely you'll see repeats of the same t.tid in clears
# ``Transaction`` passed in here if/when you are restarting or first_clear_dt and t.dt < first_clear_dt
# a ``brokerd.ib`` where the API will re-report trades from ):
# the current session, so we need to make sure we don't # NOTE: likely you'll see repeats of the same
# "double count" these in pp calculations. # ``Transaction`` passed in here if/when you are restarting
continue # a ``brokerd.ib`` where the API will re-report trades from
# the current session, so we need to make sure we don't
# "double count" these in pp calculations.
continue
# update clearing table # update clearing table
pp.add_clear(t) pp.add_clear(t)
@ -501,7 +563,7 @@ class PpTable(Struct):
# minimize clears tables and update sizing. # minimize clears tables and update sizing.
for bsuid, pp in updated.items(): for bsuid, pp in updated.items():
pp.size, pp.ppu = pp.audit_sizing() pp.ensure_state()
return updated return updated
@ -534,7 +596,7 @@ class PpTable(Struct):
# if bsuid == qqqbsuid: # if bsuid == qqqbsuid:
# breakpoint() # breakpoint()
pp.size, pp.ppu = pp.audit_sizing() pp.ensure_state()
if ( if (
# "net-zero" is a "closed" position # "net-zero" is a "closed" position
@ -574,6 +636,7 @@ class PpTable(Struct):
# keep the minimal amount of clears that make up this # keep the minimal amount of clears that make up this
# position since the last net-zero state. # position since the last net-zero state.
pos.minimize_clears() pos.minimize_clears()
pos.ensure_state()
# serialize to pre-toml form # serialize to pre-toml form
fqsn, asdict = pos.to_pretoml() fqsn, asdict = pos.to_pretoml()
@ -835,19 +898,35 @@ def open_pps(
# index clears entries in "object" form by tid in a top # index clears entries in "object" form by tid in a top
# level dict instead of a list (as is presented in our # level dict instead of a list (as is presented in our
# ``pps.toml``). # ``pps.toml``).
pp = pp_objs.get(bsuid) clears = pp_objs.setdefault(bsuid, {})
if pp:
clears = pp.clears # TODO: should be make a ``Struct`` for clear/event entries?
else: # convert "clear events table" from the toml config (list of
clears = {} # a dicts) and load it into object form for use in position
# processing of new clear events.
trans: list[Transaction] = []
for clears_table in clears_list: for clears_table in clears_list:
tid = clears_table.pop('tid') tid = clears_table.pop('tid')
dtstr = clears_table['dt']
dt = pendulum.parse(dtstr)
clears_table['dt'] = dt
trans.append(Transaction(
fqsn=bsuid,
bsuid=bsuid,
tid=tid,
size=clears_table['size'],
price=clears_table['price'],
cost=clears_table['cost'],
dt=dt,
))
clears[tid] = clears_table clears[tid] = clears_table
size = entry['size'] size = entry['size']
# TODO: remove but, handle old field name for now # TODO: remove but, handle old field name for now
ppu = entry.get('ppu', entry.get('be_price', 0)) ppu = entry.get('ppu', entry.get('be_price', 0))
split_ratio = entry.get('split_ratio')
expiry = entry.get('expiry') expiry = entry.get('expiry')
if expiry: if expiry:
@ -857,19 +936,21 @@ def open_pps(
Symbol.from_fqsn(fqsn, info={}), Symbol.from_fqsn(fqsn, info={}),
size=size, size=size,
ppu=ppu, ppu=ppu,
split_ratio=split_ratio,
expiry=expiry, expiry=expiry,
bsuid=entry['bsuid'], bsuid=entry['bsuid'],
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
clears=clears,
) )
# XXX: super critical, we need to be sure to include
# all pps.toml clears to avoid reusing clears that were
# already included in the current incremental update
# state, since today's records may have already been
# processed!
for t in trans:
pp.add_clear(t)
# audit entries loaded from toml # audit entries loaded from toml
pp.size, pp.ppu = pp.audit_sizing() pp.ensure_state()
try: try:
yield table yield table