Merge pull request #386 from pikers/paper_tolerance

Paper race tolerance
size_in_shm_token
goodboy 2022-08-29 13:28:38 -04:00 committed by GitHub
commit 77a687bced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 290 additions and 214 deletions

View File

@ -96,7 +96,7 @@ class Allocator(Struct):
def next_order_info(
self,
# we only need a startup size for exit calcs, we can the
# we only need a startup size for exit calcs, we can then
# determine how large slots should be if the initial pp size was
# larger then the current live one, and the live one is smaller
# then the initial config settings.
@ -137,12 +137,14 @@ class Allocator(Struct):
# an entry (adding-to or starting a pp)
if (
action == 'buy' and live_size > 0 or
action == 'sell' and live_size < 0 or
live_size == 0
or (action == 'buy' and live_size > 0)
or action == 'sell' and live_size < 0
):
order_size = min(slot_size, l_sub_pp)
order_size = min(
slot_size,
max(l_sub_pp, 0),
)
# an exit (removing-from or going to net-zero pp)
else:
@ -242,14 +244,6 @@ class Allocator(Struct):
return round(prop * self.slots)
_derivs = (
'future',
'continuous_future',
'option',
'futures_option',
)
def mk_allocator(
symbol: Symbol,
@ -276,45 +270,9 @@ def mk_allocator(
'currency_limit': 6e3,
'slots': 6,
}
defaults.update(user_def)
alloc = Allocator(
return Allocator(
symbol=symbol,
**defaults,
)
asset_type = symbol.type_key
# specific configs by asset class / type
if asset_type in _derivs:
# since it's harder to know how currency "applies" in this case
# given leverage properties
alloc.size_unit = '# units'
# set units limit to slots size thus making make the next
# entry step 1.0
alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.ppu
if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2)
else:
startup_size = abs(startup_pp.size)
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
if asset_type in _derivs:
alloc.slots = alloc.units_limit
return alloc

View File

@ -499,7 +499,7 @@ async def open_brokerd_trades_dialogue(
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# going to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
@ -612,9 +612,10 @@ async def translate_and_relay_brokerd_events(
brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream:
fmsg = pformat(brokerd_msg)
log.info(
f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}'
f'{fmsg}'
)
match brokerd_msg:
@ -666,7 +667,11 @@ async def translate_and_relay_brokerd_events(
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
status_msg = book._active[oid]
status_msg = book._active.get(oid)
if not status_msg:
log.warning(f'Rx Ack for closed/unknown order?: {oid}')
continue
req = status_msg.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
@ -692,7 +697,7 @@ async def translate_and_relay_brokerd_events(
} if status_msg := book._active.get(oid):
msg = BrokerdError(**brokerd_msg)
log.error(pformat(msg)) # XXX make one when it's blank?
log.error(fmsg) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
@ -726,8 +731,19 @@ async def translate_and_relay_brokerd_events(
# TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
ems_client_order_stream = router.dialogues.get(oid)
status_msg = book._active.get(oid)
if (
not ems_client_order_stream
or not status_msg
):
log.warning(
'Received status for unknown dialog {oid}:\n'
'{fmsg}'
)
continue
status_msg.resp = status
# retrieve existing live flow
@ -762,12 +778,19 @@ async def translate_and_relay_brokerd_events(
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
}:
oid = book._ems2brokerd_ids.inverse.get(reqid)
if not oid:
# TODO: maybe we could optionally check for an
# ``.oid`` in the msg since we're planning to
# maybe-kinda offer that via using ``Status``
# in the longer run anyway?
log.warning(f'Unkown fill for {fmsg}')
continue
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
log.info(f'Fill for {oid} cleared with:\n{fmsg}')
ems_client_order_stream = router.dialogues[oid]
@ -796,7 +819,7 @@ async def translate_and_relay_brokerd_events(
# registered from a previous order/status load?
log.error(
f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n'
f'{fmsg}\n'
'Unable to relay message to client side!?'
)
@ -841,7 +864,7 @@ async def translate_and_relay_brokerd_events(
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
log.error(f'Broker error:\n{fmsg}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives
@ -862,7 +885,7 @@ async def translate_and_relay_brokerd_events(
status_msg = book._active[oid]
msg += (
f'last status msg: {pformat(status_msg)}\n\n'
f'this msg:{pformat(brokerd_msg)}\n'
f'this msg:{fmsg}\n'
)
log.warning(msg)

View File

@ -18,9 +18,11 @@
Fake trading for forward testing.
"""
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import datetime
from operator import itemgetter
import itertools
import time
from typing import (
Any,
@ -72,8 +74,8 @@ class PaperBoi(Struct):
# map of paper "live" orders which be used
# to simulate fills based on paper engine settings
_buys: dict
_sells: dict
_buys: defaultdict[str, bidict]
_sells: defaultdict[str, bidict]
_reqids: bidict
_positions: dict[str, Position]
_trade_ledger: dict[str, Any]
@ -106,7 +108,6 @@ class PaperBoi(Struct):
if entry:
# order is already existing, this is a modify
(oid, symbol, action, old_price) = entry
assert old_price != price
is_modify = True
else:
# register order internally
@ -167,10 +168,10 @@ class PaperBoi(Struct):
if is_modify:
# remove any existing order for the old price
orders[symbol].pop((oid, old_price))
orders[symbol].pop(oid)
# buys/sells: (symbol -> (price -> order))
orders.setdefault(symbol, {})[(oid, price)] = (size, reqid, action)
# buys/sells: {symbol -> bidict[oid, (<price data>)]}
orders[symbol][oid] = (price, size, reqid, action)
return reqid
@ -183,16 +184,15 @@ class PaperBoi(Struct):
oid, symbol, action, price = self._reqids[reqid]
if action == 'buy':
self._buys[symbol].pop((oid, price))
self._buys[symbol].pop(oid, None)
elif action == 'sell':
self._sells[symbol].pop((oid, price))
self._sells[symbol].pop(oid, None)
# TODO: net latency model
await trio.sleep(0.05)
msg = BrokerdStatus(
status='canceled',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
@ -203,7 +203,7 @@ class PaperBoi(Struct):
async def fake_fill(
self,
symbol: str,
fqsn: str,
price: float,
size: float,
action: str, # one of {'buy', 'sell'}
@ -257,34 +257,34 @@ class PaperBoi(Struct):
await self.ems_trades_stream.send(msg)
# lookup any existing position
token = f'{symbol}.{self.broker}'
key = fqsn.rstrip(f'.{self.broker}')
pp = self._positions.setdefault(
token,
fqsn,
Position(
Symbol(
key=symbol,
key=key,
broker_info={self.broker: {}},
),
size=size,
ppu=price,
bsuid=symbol,
bsuid=key,
)
)
t = Transaction(
fqsn=symbol,
fqsn=fqsn,
tid=oid,
size=size,
price=price,
cost=0, # TODO: cost model
dt=pendulum.from_timestamp(fill_time_s),
bsuid=symbol,
bsuid=key,
)
pp.add_clear(t)
pp_msg = BrokerdPosition(
broker=self.broker,
account='paper',
symbol=symbol,
symbol=fqsn,
# TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be
# inferred from the pair?
@ -325,10 +325,30 @@ async def simulate_fills(
# dark order price filter(s)
types=('ask', 'bid', 'trade', 'last')
):
# print(tick)
tick_price = tick['price']
buys: bidict[str, tuple] = client._buys[sym]
iter_buys = reversed(sorted(
buys.values(),
key=itemgetter(0),
))
def sell_on_bid(our_price):
return tick_price <= our_price
sells: bidict[str, tuple] = client._sells[sym]
iter_sells = sorted(
sells.values(),
key=itemgetter(0)
)
def buy_on_ask(our_price):
return tick_price >= our_price
match tick:
case {
'price': tick_price,
# 'type': ('ask' | 'trade' | 'last'),
'type': 'ask',
}:
client.last_ask = (
@ -336,48 +356,66 @@ async def simulate_fills(
tick.get('size', client.last_ask[1]),
)
orders = client._buys.get(sym, {})
book_sequence = reversed(
sorted(orders.keys(), key=itemgetter(1)))
def pred(our_price):
return tick_price <= our_price
iter_entries = zip(
iter_buys,
itertools.repeat(sell_on_bid)
)
case {
'price': tick_price,
# 'type': ('bid' | 'trade' | 'last'),
'type': 'bid',
}:
client.last_bid = (
tick_price,
tick.get('size', client.last_bid[1]),
)
orders = client._sells.get(sym, {})
book_sequence = sorted(
orders.keys(),
key=itemgetter(1)
)
def pred(our_price):
return tick_price >= our_price
iter_entries = zip(
iter_sells,
itertools.repeat(buy_on_ask)
)
case {
'price': tick_price,
'type': ('trade' | 'last'),
}:
# TODO: simulate actual book queues and our orders
# place in it, might require full L2 data?
continue
# in the clearing price / last price case we
# want to iterate both sides of our book for
# clears since we don't know which direction the
# price is going to move (especially with HFT)
# and thus we simply interleave both sides (buys
# and sells) until one side clears and then
# break until the next tick?
def interleave():
for pair in zip(
iter_buys,
iter_sells,
):
for order_info, pred in zip(
pair,
itertools.cycle([sell_on_bid, buy_on_ask]),
):
yield order_info, pred
# iterate book prices descending
for oid, our_price in book_sequence:
if pred(our_price):
iter_entries = interleave()
# retreive order info
(size, reqid, action) = orders.pop((oid, our_price))
# iterate all potentially clearable book prices
# in FIFO order per side.
for order_info, pred in iter_entries:
(our_price, size, reqid, action) = order_info
clearable = pred(our_price)
if clearable:
# pop and retreive order info
oid = {
'buy': buys,
'sell': sells
}[action].inverse.pop(order_info)
# clearing price would have filled entirely
await client.fake_fill(
symbol=sym,
fqsn=sym,
# todo slippage to determine fill price
price=tick_price,
size=size,
@ -385,9 +423,6 @@ async def simulate_fills(
reqid=reqid,
oid=oid,
)
else:
# prices are iterated in sorted order so we're done
break
async def handle_order_requests(
@ -403,15 +438,21 @@ async def handle_order_requests(
case {'action': ('buy' | 'sell')}:
order = BrokerdOrder(**request_msg)
account = order.account
# error on bad inputs
reason = None
if account != 'paper':
log.error(
'This is a paper account,'
' only a `paper` selection is valid'
)
reason = f'No account found:`{account}` (paper only)?'
elif order.size == 0:
reason = 'Invalid size: 0'
if reason:
log.error(reason)
await ems_order_stream.send(BrokerdError(
oid=order.oid,
symbol=order.symbol,
reason=f'Paper only. No account found: `{account}` ?',
reason=reason,
))
continue
@ -428,7 +469,7 @@ async def handle_order_requests(
# call our client api to submit the order
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
symbol=f'{order.symbol}.{client.broker}',
price=order.price,
action=order.action,
size=order.size,
@ -451,20 +492,20 @@ async def handle_order_requests(
_reqids: bidict[str, tuple] = {}
_buys: dict[
str,
dict[
tuple[str, float],
tuple[float, str, str],
_buys: defaultdict[
str, # symbol
bidict[
str, # oid
tuple[float, float, str, str], # order info
]
] = {}
_sells: dict[
str,
dict[
tuple[str, float],
tuple[float, str, str],
] = defaultdict(bidict)
_sells: defaultdict[
str, # symbol
bidict[
str, # oid
tuple[float, float, str, str], # order info
]
] = {}
] = defaultdict(bidict)
_positions: dict[str, Position] = {}
@ -501,7 +542,6 @@ async def trades_dialogue(
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started((pp_msgs, ['paper']))
async with (

View File

@ -166,12 +166,29 @@ class SettingsPane:
key: str,
value: str,
) -> None:
'''
Try to apply some input setting (by the user), revert to previous setting if it fails
display new value if applied.
'''
self.apply_setting(key, value)
self.update_status_ui(pp=self.order_mode.current_pp)
def apply_setting(
self,
key: str,
value: str,
) -> bool:
'''
Called on any order pane edit field value change.
'''
mode = self.order_mode
tracker = mode.current_pp
alloc = tracker.alloc
# an account switch request
if key == 'account':
@ -207,25 +224,28 @@ class SettingsPane:
# load the new account's allocator
alloc = tracker.alloc
else:
tracker = mode.current_pp
alloc = tracker.alloc
size_unit = alloc.size_unit
# WRITE any settings to current pp's allocator
try:
if key == 'size_unit':
# implicit re-write of value if input
# is the "text name" of the units.
# yah yah, i know this is badd..
alloc.size_unit = value
else:
elif key != 'account': # numeric fields entry
try:
value = puterize(value)
except ValueError as err:
log.error(err.args[0])
return False
if key == 'limit':
if value <= 0:
log.error('limit must be > 0')
return False
pp = mode.current_pp.live_pp
if size_unit == 'currency':
if alloc.size_unit == 'currency':
dsize = pp.dsize
if dsize > value:
log.error(
@ -247,29 +267,42 @@ class SettingsPane:
elif key == 'slots':
if value <= 0:
raise ValueError('slots must be > 0')
# raise ValueError('slots must be > 0')
log.error('limit must be > 0')
return False
alloc.slots = int(value)
else:
log.error(f'Unknown setting {key}')
raise ValueError
# don't log account "change" case since it'll be submitted
# on every mouse interaction.
log.info(f'settings change: {key}: {value}')
except ValueError:
log.error(f'Invalid value for `{key}`: {value}')
# TODO: maybe return a diff of settings so if we can an error we
# can have general input handling code to report it through the
# UI in some way?
return True
def update_status_ui(
self,
pp: PositionTracker,
) -> None:
alloc = pp.alloc
slots = alloc.slots
used = alloc.slots_used(pp.live_pp)
# READ out settings and update the status UI / settings widgets
suffix = {'currency': ' $', 'units': ' u'}[size_unit]
suffix = {'currency': ' $', 'units': ' u'}[alloc.size_unit]
limit = alloc.limit()
# TODO: a reverse look up from the position to the equivalent
# account(s), if none then look to user config for default?
self.update_status_ui(pp=tracker)
step_size, currency_per_slot = alloc.step_sizes()
if size_unit == 'currency':
if alloc.size_unit == 'currency':
step_size = currency_per_slot
self.step_label.format(
@ -287,23 +320,7 @@ class SettingsPane:
self.form.fields['limit'].setText(str(limit))
# update of level marker size label based on any new settings
tracker.update_from_pp()
# TODO: maybe return a diff of settings so if we can an error we
# can have general input handling code to report it through the
# UI in some way?
return True
def update_status_ui(
self,
pp: PositionTracker,
) -> None:
alloc = pp.alloc
slots = alloc.slots
used = alloc.slots_used(pp.live_pp)
pp.update_from_pp()
# calculate proportion of position size limit
# that exists and display in fill bar
@ -441,6 +458,14 @@ def position_line(
return line
_derivs = (
'future',
'continuous_future',
'option',
'futures_option',
)
class PositionTracker:
'''
Track and display real-time positions for a single symbol
@ -547,14 +572,54 @@ class PositionTracker:
def update_from_pp(
self,
position: Optional[Position] = None,
set_as_startup: bool = False,
) -> None:
'''Update graphics and data from average price and size passed in our
EMS ``BrokerdPosition`` msg.
'''
Update graphics and data from average price and size passed in
our EMS ``BrokerdPosition`` msg.
'''
# live pp updates
pp = position or self.live_pp
if set_as_startup:
startup_pp = pp
else:
startup_pp = self.startup_pp
alloc = self.alloc
# update allocator settings
asset_type = pp.symbol.type_key
# specific configs by asset class / type
if asset_type in _derivs:
# since it's harder to know how currency "applies" in this case
# given leverage properties
alloc.size_unit = '# units'
# set units limit to slots size thus making make the next
# entry step 1.0
alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
startup_size = self.startup_pp.size * startup_pp.ppu
if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2)
else:
startup_size = abs(startup_pp.size)
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
if asset_type in _derivs:
alloc.slots = alloc.units_limit
self.update_line(
pp.ppu,
@ -564,7 +629,7 @@ class PositionTracker:
# label updates
self.size_label.fields['slots_used'] = round(
self.alloc.slots_used(pp), ndigits=1)
alloc.slots_used(pp), ndigits=1)
self.size_label.render()
if pp.size == 0:

View File

@ -639,22 +639,6 @@ async def open_order_mode(
iter(accounts.keys())
) if accounts else 'paper'
# Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg.
pps_by_account = {}
for (broker, acctid), msgs in position_msgs.items():
for msg in msgs:
sym = msg['symbol']
if (
(sym == symkey) or (
# mega-UGH, i think we need to fix the FQSN
# stuff sooner then later..
sym == symkey.removesuffix(f'.{broker}'))
):
pps_by_account[acctid] = msg
# update pp trackers with data relayed from ``brokerd``.
for account_name in accounts:
@ -667,10 +651,6 @@ async def open_order_mode(
# XXX: BLEH, do we care about this on the client side?
bsuid=symbol,
)
msg = pps_by_account.get(account_name)
if msg:
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
startup_pp.update_from_msg(msg)
# allocator config
alloc = mk_allocator(
@ -766,7 +746,6 @@ async def open_order_mode(
# to order sync pane handler
for key in ('account', 'size_unit',):
w = form.fields[key]
w.currentTextChanged.connect(
partial(
order_pane.on_selection_change,
@ -789,6 +768,18 @@ async def open_order_mode(
# Begin order-response streaming
done()
# Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg.
for (broker, acctid), msgs in position_msgs.items():
for msg in msgs:
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
await process_trade_msg(
mode,
book,
msg,
)
# start async input handling for chart's view
async with (
@ -876,8 +867,7 @@ async def process_trade_msg(
log.info(f'{fqsn} matched pp msg: {fmsg}')
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
# update order pane widgets
tracker.update_from_pp()
tracker.update_from_pp(set_as_startup=True) # status/pane UI
mode.pane.update_status_ui(tracker)
if tracker.live_pp.size: