Adjust paper-engine to use `Transaction` for pps updates

ppu_history
Tyler Goodlet 2022-07-27 10:28:23 -04:00
parent 7cbdc6a246
commit a0c238daa7
1 changed files with 80 additions and 53 deletions

View File

@ -22,17 +22,25 @@ from contextlib import asynccontextmanager
from datetime import datetime
from operator import itemgetter
import time
from typing import Tuple, Optional, Callable
from typing import (
Any,
Optional,
Callable,
)
import uuid
from bidict import bidict
import pendulum
import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._source import Symbol
from ..pp import Position
from ..pp import (
Position,
Transaction,
)
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
from ..log import get_logger
@ -63,11 +71,12 @@ class PaperBoi:
_buys: bidict
_sells: bidict
_reqids: bidict
_positions: dict[str, BrokerdPosition]
_positions: dict[str, Position]
_trade_ledger: dict[str, Any]
# init edge case L1 spread
last_ask: Tuple[float, float] = (float('inf'), 0) # price, size
last_bid: Tuple[float, float] = (0, 0)
last_ask: tuple[float, float] = (float('inf'), 0) # price, size
last_bid: tuple[float, float] = (0, 0)
async def submit_limit(
self,
@ -77,22 +86,23 @@ class PaperBoi:
action: str,
size: float,
reqid: Optional[str],
) -> int:
"""Place an order and return integer request id provided by client.
'''
Place an order and return integer request id provided by client.
"""
'''
is_modify: bool = False
if reqid is None:
reqid = str(uuid.uuid4())
else:
entry = self._reqids.get(reqid)
if entry:
# order is already existing, this is a modify
(oid, symbol, action, old_price) = self._reqids[reqid]
(oid, symbol, action, old_price) = entry
assert old_price != price
is_modify = True
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
else:
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
if action == 'alert':
# bypass all fill simulation
@ -197,16 +207,15 @@ class PaperBoi:
"""
# TODO: net latency model
await trio.sleep(0.05)
fill_time_ns = time.time_ns()
fill_time_s = time.time()
msg = BrokerdFill(
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
time_ns=fill_time_ns,
action=action,
size=size,
price=price,
broker_time=datetime.now().timestamp(),
broker_details={
'paper_info': {
@ -216,7 +225,9 @@ class PaperBoi:
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg)
await self.ems_trades_stream.send(fill_msg)
self._trade_ledger.update(fill_msg.to_dict())
if order_complete:
@ -243,29 +254,37 @@ class PaperBoi:
# lookup any existing position
token = f'{symbol}.{self.broker}'
pp_msg = self._positions.setdefault(
pp = self._positions.setdefault(
token,
BrokerdPosition(
broker=self.broker,
account='paper',
symbol=symbol,
# TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be
# inferred from the pair?
currency='',
size=0.0,
avg_price=0,
Position(
Symbol(key=symbol),
size=size,
be_price=price,
bsuid=symbol,
)
)
# delegate update to `.pp.Position.lifo_update()`
pp = Position(
Symbol(key=symbol),
size=pp_msg.size,
be_price=pp_msg.avg_price,
t = Transaction(
fqsn=symbol,
tid=oid,
size=size,
price=price,
cost=1., # todo cost model
dt=pendulum.from_timestamp(fill_time_s),
bsuid=symbol,
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
pp.add_clear(t)
pp_msg = BrokerdPosition(
broker=self.broker,
account='paper',
symbol=symbol,
# TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be
# inferred from the pair?
currency='',
size=pp.size,
avg_price=pp.be_price,
)
await self.ems_trades_stream.send(pp_msg)
@ -273,6 +292,7 @@ class PaperBoi:
async def simulate_fills(
quote_stream: 'tractor.ReceiveStream', # noqa
client: PaperBoi,
) -> None:
# TODO: more machinery to better simulate real-world market things:
@ -389,6 +409,24 @@ async def handle_order_requests(
# validate
order = BrokerdOrder(**request_msg)
if order.reqid is None:
reqid = str(uuid.uuid4())
else:
reqid = order.reqid
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
)
)
# call our client api to submit the order
reqid = await client.submit_limit(
@ -402,20 +440,7 @@ async def handle_order_requests(
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
)
reqid=reqid,
)
elif action == 'cancel':
@ -468,6 +493,9 @@ async def trades_dialogue(
# TODO: load paper positions from ``positions.toml``
_positions={},
# TODO: load postions from ledger file
_trade_ledger={},
)
n.start_soon(handle_order_requests, client, ems_stream)
@ -510,5 +538,4 @@ async def open_paperboi(
loglevel=loglevel,
) as (ctx, first):
yield ctx, first