diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index a8edeb8b..07593d79 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -25,8 +25,6 @@ from operator import itemgetter import itertools import time from typing import ( - Any, - Optional, Callable, ) import uuid @@ -41,7 +39,9 @@ from ..data.types import Struct from ..accounting._mktinfo import Symbol from ..accounting import ( Position, + PpTable, Transaction, + TransactionLedger, open_trade_ledger, open_pps, ) @@ -73,13 +73,14 @@ class PaperBoi(Struct): ems_trades_stream: tractor.MsgStream + ppt: PpTable + ledger: TransactionLedger + # map of paper "live" orders which be used # to simulate fills based on paper engine settings _buys: defaultdict[str, bidict] _sells: defaultdict[str, bidict] _reqids: bidict - _positions: dict[str, Position] - _trade_ledger: dict[str, Any] _syms: dict[str, Symbol] = {} # init edge case L1 spread @@ -93,7 +94,7 @@ class PaperBoi(Struct): price: float, action: str, size: float, - reqid: Optional[str], + reqid: str | None, ) -> int: ''' @@ -261,38 +262,31 @@ class PaperBoi(Struct): bs_mktid=key, ) - with ( - open_trade_ledger( - self.broker, - 'paper', - ) as ledger, + tx = t.to_dict() + tx.pop('sym') - open_pps( - brokername=self.broker, - acctid='paper', - write_on_exit=True, - ) as table - ): - tx = t.to_dict() - tx.pop('sym') - ledger.update({oid: tx}) - # Write to pps toml right now - table.update_from_trans({oid: t}) + # update in-mem ledger and pos table + self.ledger.update({oid: tx}) + self.ppt.update_from_trans({oid: t}) - pp = table.pps[key] - pp_msg = BrokerdPosition( - broker=self.broker, - account='paper', - symbol=fqme, - # TODO: we need to look up the asset currency from - # broker info. i guess for crypto this can be - # inferred from the pair? - currency=key, - size=pp.size, - avg_price=pp.ppu, - ) + # transmit pp msg to ems + pp = self.ppt.pps[key] + pp_msg = BrokerdPosition( + broker=self.broker, + account='paper', + symbol=fqme, + # TODO: we need to look up the asset currency from + # broker info. i guess for crypto this can be + # inferred from the pair? + currency=key, + size=pp.size, + avg_price=pp.ppu, + ) + await self.ems_trades_stream.send(pp_msg) - await self.ems_trades_stream.send(pp_msg) + # write all updates to filesys + self.ledger.write_config() + self.ppt.write_config() async def simulate_fills( @@ -518,7 +512,6 @@ _sells: defaultdict[ tuple[float, float, str, str], # order info ] ] = defaultdict(bidict) -_positions: dict[str, Position] = {} @tractor.context @@ -526,27 +519,34 @@ async def trades_dialogue( ctx: tractor.Context, broker: str, - fqme: str, - loglevel: str = None, + fqme: str | None = None, # if empty, we only boot broker mode + loglevel: str = 'warning', ) -> None: tractor.log.get_console_log(loglevel) - async with ( - data.open_feed( - [fqme], - loglevel=loglevel, - ) as feed, + ppt: PpTable + ledger: TransactionLedger + with ( + open_pps( + broker, + 'paper', + write_on_exit=True, + ) as ppt, + + open_trade_ledger( + broker, + 'paper', + ) as ledger ): - with open_pps(broker, 'paper') as table: - # save pps in local state - _positions.update(table.pps) + # update pos table from ledger history + ppt.update_from_trans(ledger.to_trans()) pp_msgs: list[BrokerdPosition] = [] pos: Position token: str # f'{symbol}.{self.broker}' - for token, pos in _positions.items(): + for token, pos in ppt.pps.items(): pp_msgs.append(BrokerdPosition( broker=broker, account='paper', @@ -560,42 +560,59 @@ async def trades_dialogue( ['paper'], )) + # exit early since no fqme was passed, + # normally this case is just to load + # positions "offline". + if fqme is None: + log.warning( + 'Paper engine only running in position delivery mode!\n' + 'NO SIMULATED CLEARING LOOP IS ACTIVE!' + ) + await trio.sleep_forever() + return + async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + data.open_feed( + [fqme], + loglevel=loglevel, + ) as feed, ): - client = PaperBoi( - broker, - ems_stream, - _buys=_buys, - _sells=_sells, + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + client = PaperBoi( + broker=broker, + ems_trades_stream=ems_stream, + ppt=ppt, + ledger=ledger, - _reqids=_reqids, + _buys=_buys, + _sells=_sells, + _reqids=_reqids, - _positions=_positions, + # TODO: load postions from ledger file + _syms={ + fqme: flume.symbol + for fqme, flume in feed.flumes.items() + } + ) - # TODO: load postions from ledger file - _trade_ledger={}, - _syms={ - fqme: flume.symbol - for fqme, flume in feed.flumes.items() - } - ) + n.start_soon( + handle_order_requests, + client, + ems_stream, + ) - n.start_soon( - handle_order_requests, - client, - ems_stream, - ) - - # paper engine simulator clearing task - await simulate_fills(feed.streams[broker], client) + # paper engine simulator clearing task + await simulate_fills(feed.streams[broker], client) @acm async def open_paperboi( - fqme: str, - loglevel: str, + fqme: str | None = None, + broker: str | None = None, + loglevel: str | None = None, ) -> Callable: ''' @@ -603,7 +620,11 @@ async def open_paperboi( its context. ''' - broker, symbol, expiry = unpack_fqme(fqme) + if not fqme: + assert broker, 'One of `broker` or `fqme` is required siss..!' + else: + broker, symbol, expiry = unpack_fqme(fqme) + service_name = f'paperboi.{broker}' async with (