Rework paper engine for "offline" pp loading

This will end up being super handy for testing our accounting subsystems
as well as providing unified and simple cli utils for managing ledgers
and position tracking. Allows loading the paper boi without starting
a data feed and instead just trigger ledger and pps loading without
starting the entire clearing engine.

Deatz:
- only init `PaperBoi` and start clearing loop (tasks) if a non-`None`
  fqme is provided, ow just `Context.started()` the existing pps msgs
  as loaded from the ledger.
- always update both the ledger and pp table on startup and pass
  a single instance of each obj to the `PaperBoi` for reuse (without
  opening and closing backing config files since we now have
  `.write_config()`).
- drop the global `_positions` dict, it's not needed any more if we use
  a `PaperBoi.ppt: PpTable` which persists with the engine actor's
  lifetime.
pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-29 18:36:01 -04:00
parent f46a5337d5
commit fb90c2049f
1 changed files with 94 additions and 73 deletions

View File

@ -25,8 +25,6 @@ from operator import itemgetter
import itertools import itertools
import time import time
from typing import ( from typing import (
Any,
Optional,
Callable, Callable,
) )
import uuid import uuid
@ -41,7 +39,9 @@ from ..data.types import Struct
from ..accounting._mktinfo import Symbol from ..accounting._mktinfo import Symbol
from ..accounting import ( from ..accounting import (
Position, Position,
PpTable,
Transaction, Transaction,
TransactionLedger,
open_trade_ledger, open_trade_ledger,
open_pps, open_pps,
) )
@ -73,13 +73,14 @@ class PaperBoi(Struct):
ems_trades_stream: tractor.MsgStream ems_trades_stream: tractor.MsgStream
ppt: PpTable
ledger: TransactionLedger
# map of paper "live" orders which be used # map of paper "live" orders which be used
# to simulate fills based on paper engine settings # to simulate fills based on paper engine settings
_buys: defaultdict[str, bidict] _buys: defaultdict[str, bidict]
_sells: defaultdict[str, bidict] _sells: defaultdict[str, bidict]
_reqids: bidict _reqids: bidict
_positions: dict[str, Position]
_trade_ledger: dict[str, Any]
_syms: dict[str, Symbol] = {} _syms: dict[str, Symbol] = {}
# init edge case L1 spread # init edge case L1 spread
@ -93,7 +94,7 @@ class PaperBoi(Struct):
price: float, price: float,
action: str, action: str,
size: float, size: float,
reqid: Optional[str], reqid: str | None,
) -> int: ) -> int:
''' '''
@ -261,38 +262,31 @@ class PaperBoi(Struct):
bs_mktid=key, bs_mktid=key,
) )
with ( tx = t.to_dict()
open_trade_ledger( tx.pop('sym')
self.broker,
'paper',
) as ledger,
open_pps( # update in-mem ledger and pos table
brokername=self.broker, self.ledger.update({oid: tx})
acctid='paper', self.ppt.update_from_trans({oid: t})
write_on_exit=True,
) as table
):
tx = t.to_dict()
tx.pop('sym')
ledger.update({oid: tx})
# Write to pps toml right now
table.update_from_trans({oid: t})
pp = table.pps[key] # transmit pp msg to ems
pp_msg = BrokerdPosition( pp = self.ppt.pps[key]
broker=self.broker, pp_msg = BrokerdPosition(
account='paper', broker=self.broker,
symbol=fqme, account='paper',
# TODO: we need to look up the asset currency from symbol=fqme,
# broker info. i guess for crypto this can be # TODO: we need to look up the asset currency from
# inferred from the pair? # broker info. i guess for crypto this can be
currency=key, # inferred from the pair?
size=pp.size, currency=key,
avg_price=pp.ppu, size=pp.size,
) avg_price=pp.ppu,
)
await self.ems_trades_stream.send(pp_msg)
await self.ems_trades_stream.send(pp_msg) # write all updates to filesys
self.ledger.write_config()
self.ppt.write_config()
async def simulate_fills( async def simulate_fills(
@ -518,7 +512,6 @@ _sells: defaultdict[
tuple[float, float, str, str], # order info tuple[float, float, str, str], # order info
] ]
] = defaultdict(bidict) ] = defaultdict(bidict)
_positions: dict[str, Position] = {}
@tractor.context @tractor.context
@ -526,27 +519,34 @@ async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
broker: str, broker: str,
fqme: str, fqme: str | None = None, # if empty, we only boot broker mode
loglevel: str = None, loglevel: str = 'warning',
) -> None: ) -> None:
tractor.log.get_console_log(loglevel) tractor.log.get_console_log(loglevel)
async with ( ppt: PpTable
data.open_feed( ledger: TransactionLedger
[fqme], with (
loglevel=loglevel, open_pps(
) as feed, broker,
'paper',
write_on_exit=True,
) as ppt,
open_trade_ledger(
broker,
'paper',
) as ledger
): ):
with open_pps(broker, 'paper') as table: # update pos table from ledger history
# save pps in local state ppt.update_from_trans(ledger.to_trans())
_positions.update(table.pps)
pp_msgs: list[BrokerdPosition] = [] pp_msgs: list[BrokerdPosition] = []
pos: Position pos: Position
token: str # f'{symbol}.{self.broker}' token: str # f'{symbol}.{self.broker}'
for token, pos in _positions.items(): for token, pos in ppt.pps.items():
pp_msgs.append(BrokerdPosition( pp_msgs.append(BrokerdPosition(
broker=broker, broker=broker,
account='paper', account='paper',
@ -560,42 +560,59 @@ async def trades_dialogue(
['paper'], ['paper'],
)) ))
# exit early since no fqme was passed,
# normally this case is just to load
# positions "offline".
if fqme is None:
log.warning(
'Paper engine only running in position delivery mode!\n'
'NO SIMULATED CLEARING LOOP IS ACTIVE!'
)
await trio.sleep_forever()
return
async with ( async with (
ctx.open_stream() as ems_stream, data.open_feed(
trio.open_nursery() as n, [fqme],
loglevel=loglevel,
) as feed,
): ):
client = PaperBoi( async with (
broker, ctx.open_stream() as ems_stream,
ems_stream, trio.open_nursery() as n,
_buys=_buys, ):
_sells=_sells, client = PaperBoi(
broker=broker,
ems_trades_stream=ems_stream,
ppt=ppt,
ledger=ledger,
_reqids=_reqids, _buys=_buys,
_sells=_sells,
_reqids=_reqids,
_positions=_positions, # TODO: load postions from ledger file
_syms={
fqme: flume.symbol
for fqme, flume in feed.flumes.items()
}
)
# TODO: load postions from ledger file n.start_soon(
_trade_ledger={}, handle_order_requests,
_syms={ client,
fqme: flume.symbol ems_stream,
for fqme, flume in feed.flumes.items() )
}
)
n.start_soon( # paper engine simulator clearing task
handle_order_requests, await simulate_fills(feed.streams[broker], client)
client,
ems_stream,
)
# paper engine simulator clearing task
await simulate_fills(feed.streams[broker], client)
@acm @acm
async def open_paperboi( async def open_paperboi(
fqme: str, fqme: str | None = None,
loglevel: str, broker: str | None = None,
loglevel: str | None = None,
) -> Callable: ) -> Callable:
''' '''
@ -603,7 +620,11 @@ async def open_paperboi(
its context. its context.
''' '''
broker, symbol, expiry = unpack_fqme(fqme) if not fqme:
assert broker, 'One of `broker` or `fqme` is required siss..!'
else:
broker, symbol, expiry = unpack_fqme(fqme)
service_name = f'paperboi.{broker}' service_name = f'paperboi.{broker}'
async with ( async with (