From 2cc77c21baf1ff4551b69e43e9bfe354a267192c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Mar 2023 18:36:01 -0400 Subject: [PATCH] Rework paper engine for "offline" pp loading This will end up being super handy for testing our accounting subsystems as well as providing unified and simple cli utils for managing ledgers and position tracking. Allows loading the paper boi without starting a data feed and instead just trigger ledger and pps loading without starting the entire clearing engine. Deatz: - only init `PaperBoi` and start clearing loop (tasks) if a non-`None` fqme is provided, ow just `Context.started()` the existing pps msgs as loaded from the ledger. - always update both the ledger and pp table on startup and pass a single instance of each obj to the `PaperBoi` for reuse (without opening and closing backing config files since we now have `.write_config()`). - drop the global `_positions` dict, it's not needed any more if we use a `PaperBoi.ppt: PpTable` which persists with the engine actor's lifetime. --- piker/clearing/_paper_engine.py | 167 ++++++++++++++++++-------------- 1 file changed, 94 insertions(+), 73 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index a8edeb8b..07593d79 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -25,8 +25,6 @@ from operator import itemgetter import itertools import time from typing import ( - Any, - Optional, Callable, ) import uuid @@ -41,7 +39,9 @@ from ..data.types import Struct from ..accounting._mktinfo import Symbol from ..accounting import ( Position, + PpTable, Transaction, + TransactionLedger, open_trade_ledger, open_pps, ) @@ -73,13 +73,14 @@ class PaperBoi(Struct): ems_trades_stream: tractor.MsgStream + ppt: PpTable + ledger: TransactionLedger + # map of paper "live" orders which be used # to simulate fills based on paper engine settings _buys: defaultdict[str, bidict] _sells: defaultdict[str, bidict] _reqids: bidict - _positions: dict[str, Position] - _trade_ledger: dict[str, Any] _syms: dict[str, Symbol] = {} # init edge case L1 spread @@ -93,7 +94,7 @@ class PaperBoi(Struct): price: float, action: str, size: float, - reqid: Optional[str], + reqid: str | None, ) -> int: ''' @@ -261,38 +262,31 @@ class PaperBoi(Struct): bs_mktid=key, ) - with ( - open_trade_ledger( - self.broker, - 'paper', - ) as ledger, + tx = t.to_dict() + tx.pop('sym') - open_pps( - brokername=self.broker, - acctid='paper', - write_on_exit=True, - ) as table - ): - tx = t.to_dict() - tx.pop('sym') - ledger.update({oid: tx}) - # Write to pps toml right now - table.update_from_trans({oid: t}) + # update in-mem ledger and pos table + self.ledger.update({oid: tx}) + self.ppt.update_from_trans({oid: t}) - pp = table.pps[key] - pp_msg = BrokerdPosition( - broker=self.broker, - account='paper', - symbol=fqme, - # TODO: we need to look up the asset currency from - # broker info. i guess for crypto this can be - # inferred from the pair? - currency=key, - size=pp.size, - avg_price=pp.ppu, - ) + # transmit pp msg to ems + pp = self.ppt.pps[key] + pp_msg = BrokerdPosition( + broker=self.broker, + account='paper', + symbol=fqme, + # TODO: we need to look up the asset currency from + # broker info. i guess for crypto this can be + # inferred from the pair? + currency=key, + size=pp.size, + avg_price=pp.ppu, + ) + await self.ems_trades_stream.send(pp_msg) - await self.ems_trades_stream.send(pp_msg) + # write all updates to filesys + self.ledger.write_config() + self.ppt.write_config() async def simulate_fills( @@ -518,7 +512,6 @@ _sells: defaultdict[ tuple[float, float, str, str], # order info ] ] = defaultdict(bidict) -_positions: dict[str, Position] = {} @tractor.context @@ -526,27 +519,34 @@ async def trades_dialogue( ctx: tractor.Context, broker: str, - fqme: str, - loglevel: str = None, + fqme: str | None = None, # if empty, we only boot broker mode + loglevel: str = 'warning', ) -> None: tractor.log.get_console_log(loglevel) - async with ( - data.open_feed( - [fqme], - loglevel=loglevel, - ) as feed, + ppt: PpTable + ledger: TransactionLedger + with ( + open_pps( + broker, + 'paper', + write_on_exit=True, + ) as ppt, + + open_trade_ledger( + broker, + 'paper', + ) as ledger ): - with open_pps(broker, 'paper') as table: - # save pps in local state - _positions.update(table.pps) + # update pos table from ledger history + ppt.update_from_trans(ledger.to_trans()) pp_msgs: list[BrokerdPosition] = [] pos: Position token: str # f'{symbol}.{self.broker}' - for token, pos in _positions.items(): + for token, pos in ppt.pps.items(): pp_msgs.append(BrokerdPosition( broker=broker, account='paper', @@ -560,42 +560,59 @@ async def trades_dialogue( ['paper'], )) + # exit early since no fqme was passed, + # normally this case is just to load + # positions "offline". + if fqme is None: + log.warning( + 'Paper engine only running in position delivery mode!\n' + 'NO SIMULATED CLEARING LOOP IS ACTIVE!' + ) + await trio.sleep_forever() + return + async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + data.open_feed( + [fqme], + loglevel=loglevel, + ) as feed, ): - client = PaperBoi( - broker, - ems_stream, - _buys=_buys, - _sells=_sells, + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + client = PaperBoi( + broker=broker, + ems_trades_stream=ems_stream, + ppt=ppt, + ledger=ledger, - _reqids=_reqids, + _buys=_buys, + _sells=_sells, + _reqids=_reqids, - _positions=_positions, + # TODO: load postions from ledger file + _syms={ + fqme: flume.symbol + for fqme, flume in feed.flumes.items() + } + ) - # TODO: load postions from ledger file - _trade_ledger={}, - _syms={ - fqme: flume.symbol - for fqme, flume in feed.flumes.items() - } - ) + n.start_soon( + handle_order_requests, + client, + ems_stream, + ) - n.start_soon( - handle_order_requests, - client, - ems_stream, - ) - - # paper engine simulator clearing task - await simulate_fills(feed.streams[broker], client) + # paper engine simulator clearing task + await simulate_fills(feed.streams[broker], client) @acm async def open_paperboi( - fqme: str, - loglevel: str, + fqme: str | None = None, + broker: str | None = None, + loglevel: str | None = None, ) -> Callable: ''' @@ -603,7 +620,11 @@ async def open_paperboi( its context. ''' - broker, symbol, expiry = unpack_fqme(fqme) + if not fqme: + assert broker, 'One of `broker` or `fqme` is required siss..!' + else: + broker, symbol, expiry = unpack_fqme(fqme) + service_name = f'paperboi.{broker}' async with (