diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 62b8c115..9529991a 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -96,7 +96,7 @@ class Allocator(Struct): def next_order_info( self, - # we only need a startup size for exit calcs, we can the + # we only need a startup size for exit calcs, we can then # determine how large slots should be if the initial pp size was # larger then the current live one, and the live one is smaller # then the initial config settings. @@ -137,12 +137,14 @@ class Allocator(Struct): # an entry (adding-to or starting a pp) if ( - action == 'buy' and live_size > 0 or - action == 'sell' and live_size < 0 or live_size == 0 + or (action == 'buy' and live_size > 0) + or action == 'sell' and live_size < 0 ): - - order_size = min(slot_size, l_sub_pp) + order_size = min( + slot_size, + max(l_sub_pp, 0), + ) # an exit (removing-from or going to net-zero pp) else: @@ -242,14 +244,6 @@ class Allocator(Struct): return round(prop * self.slots) -_derivs = ( - 'future', - 'continuous_future', - 'option', - 'futures_option', -) - - def mk_allocator( symbol: Symbol, @@ -276,45 +270,9 @@ def mk_allocator( 'currency_limit': 6e3, 'slots': 6, } - defaults.update(user_def) - alloc = Allocator( + return Allocator( symbol=symbol, **defaults, ) - - asset_type = symbol.type_key - - # specific configs by asset class / type - - if asset_type in _derivs: - # since it's harder to know how currency "applies" in this case - # given leverage properties - alloc.size_unit = '# units' - - # set units limit to slots size thus making make the next - # entry step 1.0 - alloc.units_limit = alloc.slots - - else: - alloc.size_unit = 'currency' - - # if the current position is already greater then the limit - # settings, increase the limit to the current position - if alloc.size_unit == 'currency': - startup_size = startup_pp.size * startup_pp.ppu - - if startup_size > alloc.currency_limit: - alloc.currency_limit = round(startup_size, ndigits=2) - - else: - startup_size = abs(startup_pp.size) - - if startup_size > alloc.units_limit: - alloc.units_limit = startup_size - - if asset_type in _derivs: - alloc.slots = alloc.units_limit - - return alloc diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ae54615b..473a9e95 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -499,7 +499,7 @@ async def open_brokerd_trades_dialogue( ): # XXX: really we only want one stream per `emsd` actor # to relay global `brokerd` order events unless we're - # doing to expect each backend to relay only orders + # going to expect each backend to relay only orders # affiliated with a particular ``trades_dialogue()`` # session (seems annoying for implementers). So, here # we cache the relay task and instead of running multiple @@ -612,9 +612,10 @@ async def translate_and_relay_brokerd_events( brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: + fmsg = pformat(brokerd_msg) log.info( f'Received broker trade event:\n' - f'{pformat(brokerd_msg)}' + f'{fmsg}' ) match brokerd_msg: @@ -666,7 +667,11 @@ async def translate_and_relay_brokerd_events( # cancelled by the ems controlling client before we # received this ack, in which case we relay that cancel # signal **asap** to the backend broker - status_msg = book._active[oid] + status_msg = book._active.get(oid) + if not status_msg: + log.warning(f'Rx Ack for closed/unknown order?: {oid}') + continue + req = status_msg.req if req and req.action == 'cancel': # assign newly providerd broker backend request id @@ -692,7 +697,7 @@ async def translate_and_relay_brokerd_events( } if status_msg := book._active.get(oid): msg = BrokerdError(**brokerd_msg) - log.error(pformat(msg)) # XXX make one when it's blank? + log.error(fmsg) # XXX make one when it's blank? # TODO: figure out how this will interact with EMS clients # for ex. on an error do we react with a dark orders @@ -726,8 +731,19 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_stream = router.dialogues[oid] - status_msg = book._active[oid] + ems_client_order_stream = router.dialogues.get(oid) + status_msg = book._active.get(oid) + + if ( + not ems_client_order_stream + or not status_msg + ): + log.warning( + 'Received status for unknown dialog {oid}:\n' + '{fmsg}' + ) + continue + status_msg.resp = status # retrieve existing live flow @@ -762,12 +778,19 @@ async def translate_and_relay_brokerd_events( 'name': 'fill', 'reqid': reqid, # brokerd generated order-request id # 'symbol': sym, # paper engine doesn't have this, nbd? - } if ( - oid := book._ems2brokerd_ids.inverse.get(reqid) - ): + }: + oid = book._ems2brokerd_ids.inverse.get(reqid) + if not oid: + # TODO: maybe we could optionally check for an + # ``.oid`` in the msg since we're planning to + # maybe-kinda offer that via using ``Status`` + # in the longer run anyway? + log.warning(f'Unkown fill for {fmsg}') + continue + # proxy through the "fill" result(s) msg = BrokerdFill(**brokerd_msg) - log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') + log.info(f'Fill for {oid} cleared with:\n{fmsg}') ems_client_order_stream = router.dialogues[oid] @@ -796,7 +819,7 @@ async def translate_and_relay_brokerd_events( # registered from a previous order/status load? log.error( f'Unknown/transient status msg:\n' - f'{pformat(brokerd_msg)}\n' + f'{fmsg}\n' 'Unable to relay message to client side!?' ) @@ -841,7 +864,7 @@ async def translate_and_relay_brokerd_events( 'name': 'status', 'status': 'error', }: - log.error(f'Broker error:\n{pformat(brokerd_msg)}') + log.error(f'Broker error:\n{fmsg}') # XXX: we presume the brokerd cancels its own order # TOO FAST ``BrokerdStatus`` that arrives @@ -862,7 +885,7 @@ async def translate_and_relay_brokerd_events( status_msg = book._active[oid] msg += ( f'last status msg: {pformat(status_msg)}\n\n' - f'this msg:{pformat(brokerd_msg)}\n' + f'this msg:{fmsg}\n' ) log.warning(msg) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 2936ff59..ef18e3c5 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -18,9 +18,11 @@ Fake trading for forward testing. """ +from collections import defaultdict from contextlib import asynccontextmanager from datetime import datetime from operator import itemgetter +import itertools import time from typing import ( Any, @@ -72,8 +74,8 @@ class PaperBoi(Struct): # map of paper "live" orders which be used # to simulate fills based on paper engine settings - _buys: dict - _sells: dict + _buys: defaultdict[str, bidict] + _sells: defaultdict[str, bidict] _reqids: bidict _positions: dict[str, Position] _trade_ledger: dict[str, Any] @@ -106,7 +108,6 @@ class PaperBoi(Struct): if entry: # order is already existing, this is a modify (oid, symbol, action, old_price) = entry - assert old_price != price is_modify = True else: # register order internally @@ -167,10 +168,10 @@ class PaperBoi(Struct): if is_modify: # remove any existing order for the old price - orders[symbol].pop((oid, old_price)) + orders[symbol].pop(oid) - # buys/sells: (symbol -> (price -> order)) - orders.setdefault(symbol, {})[(oid, price)] = (size, reqid, action) + # buys/sells: {symbol -> bidict[oid, ()]} + orders[symbol][oid] = (price, size, reqid, action) return reqid @@ -183,16 +184,15 @@ class PaperBoi(Struct): oid, symbol, action, price = self._reqids[reqid] if action == 'buy': - self._buys[symbol].pop((oid, price)) + self._buys[symbol].pop(oid, None) elif action == 'sell': - self._sells[symbol].pop((oid, price)) + self._sells[symbol].pop(oid, None) # TODO: net latency model await trio.sleep(0.05) msg = BrokerdStatus( status='canceled', - # account=f'paper_{self.broker}', account='paper', reqid=reqid, time_ns=time.time_ns(), @@ -203,7 +203,7 @@ class PaperBoi(Struct): async def fake_fill( self, - symbol: str, + fqsn: str, price: float, size: float, action: str, # one of {'buy', 'sell'} @@ -257,34 +257,34 @@ class PaperBoi(Struct): await self.ems_trades_stream.send(msg) # lookup any existing position - token = f'{symbol}.{self.broker}' + key = fqsn.rstrip(f'.{self.broker}') pp = self._positions.setdefault( - token, + fqsn, Position( Symbol( - key=symbol, + key=key, broker_info={self.broker: {}}, ), size=size, ppu=price, - bsuid=symbol, + bsuid=key, ) ) t = Transaction( - fqsn=symbol, + fqsn=fqsn, tid=oid, size=size, price=price, cost=0, # TODO: cost model dt=pendulum.from_timestamp(fill_time_s), - bsuid=symbol, + bsuid=key, ) pp.add_clear(t) pp_msg = BrokerdPosition( broker=self.broker, account='paper', - symbol=symbol, + symbol=fqsn, # TODO: we need to look up the asset currency from # broker info. i guess for crypto this can be # inferred from the pair? @@ -325,10 +325,30 @@ async def simulate_fills( # dark order price filter(s) types=('ask', 'bid', 'trade', 'last') ): - # print(tick) + tick_price = tick['price'] + + buys: bidict[str, tuple] = client._buys[sym] + iter_buys = reversed(sorted( + buys.values(), + key=itemgetter(0), + )) + + def sell_on_bid(our_price): + return tick_price <= our_price + + sells: bidict[str, tuple] = client._sells[sym] + iter_sells = sorted( + sells.values(), + key=itemgetter(0) + ) + + def buy_on_ask(our_price): + return tick_price >= our_price + match tick: case { 'price': tick_price, + # 'type': ('ask' | 'trade' | 'last'), 'type': 'ask', }: client.last_ask = ( @@ -336,48 +356,66 @@ async def simulate_fills( tick.get('size', client.last_ask[1]), ) - orders = client._buys.get(sym, {}) - book_sequence = reversed( - sorted(orders.keys(), key=itemgetter(1))) - - def pred(our_price): - return tick_price <= our_price + iter_entries = zip( + iter_buys, + itertools.repeat(sell_on_bid) + ) case { 'price': tick_price, + # 'type': ('bid' | 'trade' | 'last'), 'type': 'bid', }: client.last_bid = ( tick_price, tick.get('size', client.last_bid[1]), ) - orders = client._sells.get(sym, {}) - book_sequence = sorted( - orders.keys(), - key=itemgetter(1) - ) - def pred(our_price): - return tick_price >= our_price + iter_entries = zip( + iter_sells, + itertools.repeat(buy_on_ask) + ) case { 'price': tick_price, 'type': ('trade' | 'last'), }: - # TODO: simulate actual book queues and our orders - # place in it, might require full L2 data? - continue + # in the clearing price / last price case we + # want to iterate both sides of our book for + # clears since we don't know which direction the + # price is going to move (especially with HFT) + # and thus we simply interleave both sides (buys + # and sells) until one side clears and then + # break until the next tick? + def interleave(): + for pair in zip( + iter_buys, + iter_sells, + ): + for order_info, pred in zip( + pair, + itertools.cycle([sell_on_bid, buy_on_ask]), + ): + yield order_info, pred - # iterate book prices descending - for oid, our_price in book_sequence: - if pred(our_price): + iter_entries = interleave() - # retreive order info - (size, reqid, action) = orders.pop((oid, our_price)) + # iterate all potentially clearable book prices + # in FIFO order per side. + for order_info, pred in iter_entries: + (our_price, size, reqid, action) = order_info + + clearable = pred(our_price) + if clearable: + # pop and retreive order info + oid = { + 'buy': buys, + 'sell': sells + }[action].inverse.pop(order_info) # clearing price would have filled entirely await client.fake_fill( - symbol=sym, + fqsn=sym, # todo slippage to determine fill price price=tick_price, size=size, @@ -385,9 +423,6 @@ async def simulate_fills( reqid=reqid, oid=oid, ) - else: - # prices are iterated in sorted order so we're done - break async def handle_order_requests( @@ -403,15 +438,21 @@ async def handle_order_requests( case {'action': ('buy' | 'sell')}: order = BrokerdOrder(**request_msg) account = order.account + + # error on bad inputs + reason = None if account != 'paper': - log.error( - 'This is a paper account,' - ' only a `paper` selection is valid' - ) + reason = f'No account found:`{account}` (paper only)?' + + elif order.size == 0: + reason = 'Invalid size: 0' + + if reason: + log.error(reason) await ems_order_stream.send(BrokerdError( oid=order.oid, symbol=order.symbol, - reason=f'Paper only. No account found: `{account}` ?', + reason=reason, )) continue @@ -428,7 +469,7 @@ async def handle_order_requests( # call our client api to submit the order reqid = await client.submit_limit( oid=order.oid, - symbol=order.symbol, + symbol=f'{order.symbol}.{client.broker}', price=order.price, action=order.action, size=order.size, @@ -451,20 +492,20 @@ async def handle_order_requests( _reqids: bidict[str, tuple] = {} -_buys: dict[ - str, - dict[ - tuple[str, float], - tuple[float, str, str], +_buys: defaultdict[ + str, # symbol + bidict[ + str, # oid + tuple[float, float, str, str], # order info ] -] = {} -_sells: dict[ - str, - dict[ - tuple[str, float], - tuple[float, str, str], +] = defaultdict(bidict) +_sells: defaultdict[ + str, # symbol + bidict[ + str, # oid + tuple[float, float, str, str], # order info ] -] = {} +] = defaultdict(bidict) _positions: dict[str, Position] = {} @@ -501,7 +542,6 @@ async def trades_dialogue( # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` - # await ctx.started(all_positions) await ctx.started((pp_msgs, ['paper'])) async with ( diff --git a/piker/ui/_position.py b/piker/ui/_position.py index e02187da..9e4c5ff4 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -166,12 +166,29 @@ class SettingsPane: key: str, value: str, + ) -> None: + ''' + Try to apply some input setting (by the user), revert to previous setting if it fails + display new value if applied. + + ''' + self.apply_setting(key, value) + self.update_status_ui(pp=self.order_mode.current_pp) + + def apply_setting( + self, + + key: str, + value: str, + ) -> bool: ''' Called on any order pane edit field value change. ''' mode = self.order_mode + tracker = mode.current_pp + alloc = tracker.alloc # an account switch request if key == 'account': @@ -207,69 +224,85 @@ class SettingsPane: # load the new account's allocator alloc = tracker.alloc - else: - tracker = mode.current_pp - alloc = tracker.alloc - - size_unit = alloc.size_unit - # WRITE any settings to current pp's allocator - try: - if key == 'size_unit': - # implicit re-write of value if input - # is the "text name" of the units. - # yah yah, i know this is badd.. - alloc.size_unit = value - else: + if key == 'size_unit': + # implicit re-write of value if input + # is the "text name" of the units. + # yah yah, i know this is badd.. + alloc.size_unit = value + + elif key != 'account': # numeric fields entry + try: value = puterize(value) - if key == 'limit': - pp = mode.current_pp.live_pp + except ValueError as err: + log.error(err.args[0]) + return False - if size_unit == 'currency': - dsize = pp.dsize - if dsize > value: - log.error( - f'limit must > then current pp: {dsize}' - ) - raise ValueError + if key == 'limit': + if value <= 0: + log.error('limit must be > 0') + return False - alloc.currency_limit = value + pp = mode.current_pp.live_pp - else: - size = pp.size - if size > value: - log.error( - f'limit must > then current pp: {size}' - ) - raise ValueError + if alloc.size_unit == 'currency': + dsize = pp.dsize + if dsize > value: + log.error( + f'limit must > then current pp: {dsize}' + ) + raise ValueError - alloc.units_limit = value - - elif key == 'slots': - if value <= 0: - raise ValueError('slots must be > 0') - alloc.slots = int(value) + alloc.currency_limit = value else: - log.error(f'Unknown setting {key}') - raise ValueError + size = pp.size + if size > value: + log.error( + f'limit must > then current pp: {size}' + ) + raise ValueError + alloc.units_limit = value + + elif key == 'slots': + if value <= 0: + # raise ValueError('slots must be > 0') + log.error('limit must be > 0') + return False + + alloc.slots = int(value) + + else: + log.error(f'Unknown setting {key}') + raise ValueError + + # don't log account "change" case since it'll be submitted + # on every mouse interaction. log.info(f'settings change: {key}: {value}') - except ValueError: - log.error(f'Invalid value for `{key}`: {value}') + # TODO: maybe return a diff of settings so if we can an error we + # can have general input handling code to report it through the + # UI in some way? + return True + + def update_status_ui( + self, + pp: PositionTracker, + + ) -> None: + + alloc = pp.alloc + slots = alloc.slots + used = alloc.slots_used(pp.live_pp) # READ out settings and update the status UI / settings widgets - suffix = {'currency': ' $', 'units': ' u'}[size_unit] + suffix = {'currency': ' $', 'units': ' u'}[alloc.size_unit] limit = alloc.limit() - # TODO: a reverse look up from the position to the equivalent - # account(s), if none then look to user config for default? - self.update_status_ui(pp=tracker) - step_size, currency_per_slot = alloc.step_sizes() - if size_unit == 'currency': + if alloc.size_unit == 'currency': step_size = currency_per_slot self.step_label.format( @@ -287,23 +320,7 @@ class SettingsPane: self.form.fields['limit'].setText(str(limit)) # update of level marker size label based on any new settings - tracker.update_from_pp() - - # TODO: maybe return a diff of settings so if we can an error we - # can have general input handling code to report it through the - # UI in some way? - return True - - def update_status_ui( - self, - - pp: PositionTracker, - - ) -> None: - - alloc = pp.alloc - slots = alloc.slots - used = alloc.slots_used(pp.live_pp) + pp.update_from_pp() # calculate proportion of position size limit # that exists and display in fill bar @@ -441,6 +458,14 @@ def position_line( return line +_derivs = ( + 'future', + 'continuous_future', + 'option', + 'futures_option', +) + + class PositionTracker: ''' Track and display real-time positions for a single symbol @@ -547,14 +572,54 @@ class PositionTracker: def update_from_pp( self, position: Optional[Position] = None, + set_as_startup: bool = False, ) -> None: - '''Update graphics and data from average price and size passed in our - EMS ``BrokerdPosition`` msg. + ''' + Update graphics and data from average price and size passed in + our EMS ``BrokerdPosition`` msg. ''' # live pp updates pp = position or self.live_pp + if set_as_startup: + startup_pp = pp + else: + startup_pp = self.startup_pp + alloc = self.alloc + + # update allocator settings + asset_type = pp.symbol.type_key + + # specific configs by asset class / type + if asset_type in _derivs: + # since it's harder to know how currency "applies" in this case + # given leverage properties + alloc.size_unit = '# units' + + # set units limit to slots size thus making make the next + # entry step 1.0 + alloc.units_limit = alloc.slots + + else: + alloc.size_unit = 'currency' + + # if the current position is already greater then the limit + # settings, increase the limit to the current position + if alloc.size_unit == 'currency': + startup_size = self.startup_pp.size * startup_pp.ppu + + if startup_size > alloc.currency_limit: + alloc.currency_limit = round(startup_size, ndigits=2) + + else: + startup_size = abs(startup_pp.size) + + if startup_size > alloc.units_limit: + alloc.units_limit = startup_size + + if asset_type in _derivs: + alloc.slots = alloc.units_limit self.update_line( pp.ppu, @@ -564,7 +629,7 @@ class PositionTracker: # label updates self.size_label.fields['slots_used'] = round( - self.alloc.slots_used(pp), ndigits=1) + alloc.slots_used(pp), ndigits=1) self.size_label.render() if pp.size == 0: diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index cbe1bf9f..ee484bf3 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -639,22 +639,6 @@ async def open_order_mode( iter(accounts.keys()) ) if accounts else 'paper' - # Pack position messages by account, should only be one-to-one. - # NOTE: requires the backend exactly specifies - # the expected symbol key in its positions msg. - pps_by_account = {} - for (broker, acctid), msgs in position_msgs.items(): - for msg in msgs: - - sym = msg['symbol'] - if ( - (sym == symkey) or ( - # mega-UGH, i think we need to fix the FQSN - # stuff sooner then later.. - sym == symkey.removesuffix(f'.{broker}')) - ): - pps_by_account[acctid] = msg - # update pp trackers with data relayed from ``brokerd``. for account_name in accounts: @@ -667,10 +651,6 @@ async def open_order_mode( # XXX: BLEH, do we care about this on the client side? bsuid=symbol, ) - msg = pps_by_account.get(account_name) - if msg: - log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') - startup_pp.update_from_msg(msg) # allocator config alloc = mk_allocator( @@ -766,7 +746,6 @@ async def open_order_mode( # to order sync pane handler for key in ('account', 'size_unit',): w = form.fields[key] - w.currentTextChanged.connect( partial( order_pane.on_selection_change, @@ -789,6 +768,18 @@ async def open_order_mode( # Begin order-response streaming done() + # Pack position messages by account, should only be one-to-one. + # NOTE: requires the backend exactly specifies + # the expected symbol key in its positions msg. + for (broker, acctid), msgs in position_msgs.items(): + for msg in msgs: + log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') + await process_trade_msg( + mode, + book, + msg, + ) + # start async input handling for chart's view async with ( @@ -876,8 +867,7 @@ async def process_trade_msg( log.info(f'{fqsn} matched pp msg: {fmsg}') tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) - # update order pane widgets - tracker.update_from_pp() + tracker.update_from_pp(set_as_startup=True) # status/pane UI mode.pane.update_status_ui(tracker) if tracker.live_pp.size: