From a0c238daa7c56f026fe93e9d805b768d52f60552 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Jul 2022 10:28:23 -0400 Subject: [PATCH] Adjust paper-engine to use `Transaction` for pps updates --- piker/clearing/_paper_engine.py | 133 +++++++++++++++++++------------- 1 file changed, 80 insertions(+), 53 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 59c05e96..3bb93b80 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -22,17 +22,25 @@ from contextlib import asynccontextmanager from datetime import datetime from operator import itemgetter import time -from typing import Tuple, Optional, Callable +from typing import ( + Any, + Optional, + Callable, +) import uuid from bidict import bidict +import pendulum import trio import tractor from dataclasses import dataclass from .. import data from ..data._source import Symbol -from ..pp import Position +from ..pp import ( + Position, + Transaction, +) from ..data._normalize import iterticks from ..data._source import unpack_fqsn from ..log import get_logger @@ -63,11 +71,12 @@ class PaperBoi: _buys: bidict _sells: bidict _reqids: bidict - _positions: dict[str, BrokerdPosition] + _positions: dict[str, Position] + _trade_ledger: dict[str, Any] # init edge case L1 spread - last_ask: Tuple[float, float] = (float('inf'), 0) # price, size - last_bid: Tuple[float, float] = (0, 0) + last_ask: tuple[float, float] = (float('inf'), 0) # price, size + last_bid: tuple[float, float] = (0, 0) async def submit_limit( self, @@ -77,22 +86,23 @@ class PaperBoi: action: str, size: float, reqid: Optional[str], + ) -> int: - """Place an order and return integer request id provided by client. + ''' + Place an order and return integer request id provided by client. - """ + ''' is_modify: bool = False - if reqid is None: - reqid = str(uuid.uuid4()) - else: + entry = self._reqids.get(reqid) + if entry: # order is already existing, this is a modify - (oid, symbol, action, old_price) = self._reqids[reqid] + (oid, symbol, action, old_price) = entry assert old_price != price is_modify = True - - # register order internally - self._reqids[reqid] = (oid, symbol, action, price) + else: + # register order internally + self._reqids[reqid] = (oid, symbol, action, price) if action == 'alert': # bypass all fill simulation @@ -197,16 +207,15 @@ class PaperBoi: """ # TODO: net latency model await trio.sleep(0.05) + fill_time_ns = time.time_ns() + fill_time_s = time.time() - msg = BrokerdFill( - + fill_msg = BrokerdFill( reqid=reqid, - time_ns=time.time_ns(), - + time_ns=fill_time_ns, action=action, size=size, price=price, - broker_time=datetime.now().timestamp(), broker_details={ 'paper_info': { @@ -216,7 +225,9 @@ class PaperBoi: 'name': self.broker + '_paper', }, ) - await self.ems_trades_stream.send(msg) + await self.ems_trades_stream.send(fill_msg) + + self._trade_ledger.update(fill_msg.to_dict()) if order_complete: @@ -243,29 +254,37 @@ class PaperBoi: # lookup any existing position token = f'{symbol}.{self.broker}' - pp_msg = self._positions.setdefault( + pp = self._positions.setdefault( token, - BrokerdPosition( - broker=self.broker, - account='paper', - symbol=symbol, - # TODO: we need to look up the asset currency from - # broker info. i guess for crypto this can be - # inferred from the pair? - currency='', - size=0.0, - avg_price=0, + Position( + Symbol(key=symbol), + size=size, + be_price=price, + bsuid=symbol, ) ) - - # delegate update to `.pp.Position.lifo_update()` - pp = Position( - Symbol(key=symbol), - size=pp_msg.size, - be_price=pp_msg.avg_price, + t = Transaction( + fqsn=symbol, + tid=oid, + size=size, + price=price, + cost=1., # todo cost model + dt=pendulum.from_timestamp(fill_time_s), bsuid=symbol, ) - pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) + pp.add_clear(t) + + pp_msg = BrokerdPosition( + broker=self.broker, + account='paper', + symbol=symbol, + # TODO: we need to look up the asset currency from + # broker info. i guess for crypto this can be + # inferred from the pair? + currency='', + size=pp.size, + avg_price=pp.be_price, + ) await self.ems_trades_stream.send(pp_msg) @@ -273,6 +292,7 @@ class PaperBoi: async def simulate_fills( quote_stream: 'tractor.ReceiveStream', # noqa client: PaperBoi, + ) -> None: # TODO: more machinery to better simulate real-world market things: @@ -389,6 +409,24 @@ async def handle_order_requests( # validate order = BrokerdOrder(**request_msg) + if order.reqid is None: + reqid = str(uuid.uuid4()) + else: + reqid = order.reqid + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + ) + ) + # call our client api to submit the order reqid = await client.submit_limit( @@ -402,20 +440,7 @@ async def handle_order_requests( # there is no existing order so ask the client to create # a new one (which it seems to do by allocating an int # counter - collision prone..) - reqid=order.reqid, - ) - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - ) + reqid=reqid, ) elif action == 'cancel': @@ -468,6 +493,9 @@ async def trades_dialogue( # TODO: load paper positions from ``positions.toml`` _positions={}, + + # TODO: load postions from ledger file + _trade_ledger={}, ) n.start_soon(handle_order_requests, client, ems_stream) @@ -510,5 +538,4 @@ async def open_paperboi( loglevel=loglevel, ) as (ctx, first): - yield ctx, first