Rework paper engine for "offline" pp loading

This will end up being super handy for testing our accounting subsystems
as well as providing unified and simple cli utils for managing ledgers
and position tracking. Allows loading the paper boi without starting
a data feed and instead just trigger ledger and pps loading without
starting the entire clearing engine.

Deatz:
- only init `PaperBoi` and start clearing loop (tasks) if a non-`None`
  fqme is provided, ow just `Context.started()` the existing pps msgs
  as loaded from the ledger.
- always update both the ledger and pp table on startup and pass
  a single instance of each obj to the `PaperBoi` for reuse (without
  opening and closing backing config files since we now have
  `.write_config()`).
- drop the global `_positions` dict, it's not needed any more if we use
  a `PaperBoi.ppt: PpTable` which persists with the engine actor's
  lifetime.
rekt_pps
Tyler Goodlet 2023-03-29 18:36:01 -04:00
parent 1560330acd
commit 2cc77c21ba
1 changed files with 94 additions and 73 deletions

View File

@ -25,8 +25,6 @@ from operator import itemgetter
import itertools import itertools
import time import time
from typing import ( from typing import (
Any,
Optional,
Callable, Callable,
) )
import uuid import uuid
@ -41,7 +39,9 @@ from ..data.types import Struct
from ..accounting._mktinfo import Symbol from ..accounting._mktinfo import Symbol
from ..accounting import ( from ..accounting import (
Position, Position,
PpTable,
Transaction, Transaction,
TransactionLedger,
open_trade_ledger, open_trade_ledger,
open_pps, open_pps,
) )
@ -73,13 +73,14 @@ class PaperBoi(Struct):
ems_trades_stream: tractor.MsgStream ems_trades_stream: tractor.MsgStream
ppt: PpTable
ledger: TransactionLedger
# map of paper "live" orders which be used # map of paper "live" orders which be used
# to simulate fills based on paper engine settings # to simulate fills based on paper engine settings
_buys: defaultdict[str, bidict] _buys: defaultdict[str, bidict]
_sells: defaultdict[str, bidict] _sells: defaultdict[str, bidict]
_reqids: bidict _reqids: bidict
_positions: dict[str, Position]
_trade_ledger: dict[str, Any]
_syms: dict[str, Symbol] = {} _syms: dict[str, Symbol] = {}
# init edge case L1 spread # init edge case L1 spread
@ -93,7 +94,7 @@ class PaperBoi(Struct):
price: float, price: float,
action: str, action: str,
size: float, size: float,
reqid: Optional[str], reqid: str | None,
) -> int: ) -> int:
''' '''
@ -261,25 +262,15 @@ class PaperBoi(Struct):
bs_mktid=key, bs_mktid=key,
) )
with (
open_trade_ledger(
self.broker,
'paper',
) as ledger,
open_pps(
brokername=self.broker,
acctid='paper',
write_on_exit=True,
) as table
):
tx = t.to_dict() tx = t.to_dict()
tx.pop('sym') tx.pop('sym')
ledger.update({oid: tx})
# Write to pps toml right now
table.update_from_trans({oid: t})
pp = table.pps[key] # update in-mem ledger and pos table
self.ledger.update({oid: tx})
self.ppt.update_from_trans({oid: t})
# transmit pp msg to ems
pp = self.ppt.pps[key]
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
broker=self.broker, broker=self.broker,
account='paper', account='paper',
@ -291,9 +282,12 @@ class PaperBoi(Struct):
size=pp.size, size=pp.size,
avg_price=pp.ppu, avg_price=pp.ppu,
) )
await self.ems_trades_stream.send(pp_msg) await self.ems_trades_stream.send(pp_msg)
# write all updates to filesys
self.ledger.write_config()
self.ppt.write_config()
async def simulate_fills( async def simulate_fills(
quote_stream: tractor.MsgStream, # noqa quote_stream: tractor.MsgStream, # noqa
@ -518,7 +512,6 @@ _sells: defaultdict[
tuple[float, float, str, str], # order info tuple[float, float, str, str], # order info
] ]
] = defaultdict(bidict) ] = defaultdict(bidict)
_positions: dict[str, Position] = {}
@tractor.context @tractor.context
@ -526,27 +519,34 @@ async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
broker: str, broker: str,
fqme: str, fqme: str | None = None, # if empty, we only boot broker mode
loglevel: str = None, loglevel: str = 'warning',
) -> None: ) -> None:
tractor.log.get_console_log(loglevel) tractor.log.get_console_log(loglevel)
async with ( ppt: PpTable
data.open_feed( ledger: TransactionLedger
[fqme], with (
loglevel=loglevel, open_pps(
) as feed, broker,
'paper',
write_on_exit=True,
) as ppt,
open_trade_ledger(
broker,
'paper',
) as ledger
): ):
with open_pps(broker, 'paper') as table: # update pos table from ledger history
# save pps in local state ppt.update_from_trans(ledger.to_trans())
_positions.update(table.pps)
pp_msgs: list[BrokerdPosition] = [] pp_msgs: list[BrokerdPosition] = []
pos: Position pos: Position
token: str # f'{symbol}.{self.broker}' token: str # f'{symbol}.{self.broker}'
for token, pos in _positions.items(): for token, pos in ppt.pps.items():
pp_msgs.append(BrokerdPosition( pp_msgs.append(BrokerdPosition(
broker=broker, broker=broker,
account='paper', account='paper',
@ -560,22 +560,38 @@ async def trades_dialogue(
['paper'], ['paper'],
)) ))
# exit early since no fqme was passed,
# normally this case is just to load
# positions "offline".
if fqme is None:
log.warning(
'Paper engine only running in position delivery mode!\n'
'NO SIMULATED CLEARING LOOP IS ACTIVE!'
)
await trio.sleep_forever()
return
async with (
data.open_feed(
[fqme],
loglevel=loglevel,
) as feed,
):
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
client = PaperBoi( client = PaperBoi(
broker, broker=broker,
ems_stream, ems_trades_stream=ems_stream,
ppt=ppt,
ledger=ledger,
_buys=_buys, _buys=_buys,
_sells=_sells, _sells=_sells,
_reqids=_reqids, _reqids=_reqids,
_positions=_positions,
# TODO: load postions from ledger file # TODO: load postions from ledger file
_trade_ledger={},
_syms={ _syms={
fqme: flume.symbol fqme: flume.symbol
for fqme, flume in feed.flumes.items() for fqme, flume in feed.flumes.items()
@ -594,8 +610,9 @@ async def trades_dialogue(
@acm @acm
async def open_paperboi( async def open_paperboi(
fqme: str, fqme: str | None = None,
loglevel: str, broker: str | None = None,
loglevel: str | None = None,
) -> Callable: ) -> Callable:
''' '''
@ -603,7 +620,11 @@ async def open_paperboi(
its context. its context.
''' '''
if not fqme:
assert broker, 'One of `broker` or `fqme` is required siss..!'
else:
broker, symbol, expiry = unpack_fqme(fqme) broker, symbol, expiry = unpack_fqme(fqme)
service_name = f'paperboi.{broker}' service_name = f'paperboi.{broker}'
async with ( async with (