Port paper engine to new msgs and run in sub-actor

This makes the paper engine look IPC-wise exactly like any
broker-provider backend module and uses the new ``trades_dialogue()``
2-way streaming endpoint for commanding order requests.

This serves as a first step toward truly distributed forward testing
since the paper engine can now be run out-of tree from `pikerd` if
needed thus demonstrating how real-time clearing signals can be shared
between fully distinct services.
ems_to_bidir_streaming
Tyler Goodlet 2021-06-08 12:06:47 -04:00
parent 23094d8624
commit 0dabc6ad26
1 changed files with 212 additions and 78 deletions

View File

@ -18,17 +18,28 @@
Fake trading for forward testing.
"""
from contextlib import asynccontextmanager
from datetime import datetime
from operator import itemgetter
import time
from typing import Tuple, Optional
from typing import Tuple, Optional, Callable
import uuid
from bidict import bidict
import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._normalize import iterticks
from ..log import get_logger
from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill,
)
log = get_logger(__name__)
@dataclass
@ -41,8 +52,8 @@ class PaperBoi:
"""
broker: str
_to_trade_stream: trio.abc.SendChannel
trade_stream: trio.abc.ReceiveChannel
ems_trades_stream: tractor.MsgStream
# map of paper "live" orders which be used
# to simulate fills based on paper engine settings
@ -61,20 +72,20 @@ class PaperBoi:
price: float,
action: str,
size: float,
brid: Optional[str],
reqid: Optional[str],
) -> int:
"""Place an order and return integer request id provided by client.
"""
if brid is None:
is_modify: bool = False
if reqid is None:
reqid = str(uuid.uuid4())
else:
# order is already existing, this is a modify
(oid, symbol, action, old_price) = self._reqids[brid]
(oid, symbol, action, old_price) = self._reqids[reqid]
assert old_price != price
reqid = brid
is_modify = True
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
@ -90,22 +101,16 @@ class PaperBoi:
# in the broker trades event processing loop
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'reqid': reqid,
'status': 'submitted',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper_info': {
'oid': oid,
},
}),
})
msg = BrokerdStatus(
status='submitted',
reqid=reqid,
broker=self.broker,
time_ns=time.time_ns(),
filled=0.0,
reason='paper_trigger',
remaining=size,
)
await self.ems_trades_stream.send(msg.dict())
# if we're already a clearing price simulate an immediate fill
if (
@ -129,7 +134,7 @@ class PaperBoi:
# and trigger by the simulated clearing task normally
# running ``simulate_fills()``.
if brid is not None:
if is_modify:
# remove any existing order for the old price
orders[symbol].pop((oid, old_price))
@ -144,7 +149,6 @@ class PaperBoi:
) -> None:
# TODO: fake market simulation effects
# await self._to_trade_stream.send(
oid, symbol, action, price = self._reqids[reqid]
if action == 'buy':
@ -155,21 +159,14 @@ class PaperBoi:
# TODO: net latency model
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'oid': oid,
'reqid': reqid,
'status': 'cancelled',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper': True,
}),
})
msg = BrokerdStatus(
status='cancelled',
oid=oid,
reqid=reqid,
broker=self.broker,
time_ns=time.time_ns(),
)
await self.ems_trades_stream.send(msg.dict())
async def fake_fill(
self,
@ -191,56 +188,51 @@ class PaperBoi:
# TODO: net latency model
await trio.sleep(0.05)
# the trades stream expects events in the form
# {'local_trades': (event_name, msg)}
await self._to_trade_stream.send({
msg = BrokerdFill(
'local_trades': ('fill', {
reqid=reqid,
time_ns=time.time_ns(),
'status': 'filled',
'broker': self.broker,
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
'action': action,
'size': size,
'price': price,
'remaining': 0 if order_complete else remaining,
# normally filled by real `brokerd` daemon
'time': time.time_ns(),
'time_ns': time.time_ns(), # cuz why not
# fake ids
'reqid': reqid,
action=action,
size=size,
price=price,
broker_time=datetime.now().timestamp(),
broker_details={
'paper_info': {
'oid': oid,
},
# mocking ib
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg.dict())
# XXX: fields we might not need to emulate?
# execution id from broker
# 'execid': execu.execId,
# 'cmd': cmd, # original request message?
}),
})
if order_complete:
await self._to_trade_stream.send({
'local_trades': ('status', {
'reqid': reqid,
'status': 'filled',
'broker': self.broker,
'filled': size,
'remaining': 0 if order_complete else remaining,
msg = BrokerdStatus(
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
reqid=reqid,
time_ns=time.time_ns(),
status='filled',
# broker=self.broker,
filled=size,
remaining=0 if order_complete else remaining,
action=action,
size=size,
price=price,
# broker=self.broker,
broker_details={
'paper_info': {
'oid': oid,
},
}),
})
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg.dict())
async def simulate_fills(
@ -327,3 +319,145 @@ async def simulate_fills(
else:
# prices are iterated in sorted order so we're done
break
# class MockBrokerdMsgStream:
# async def MockContext(*args, **kwargs):
async def handle_order_requests(
client: PaperBoi,
ems_order_stream: tractor.MsgStream,
) -> None:
# order_request: dict
async for request_msg in ems_order_stream:
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
broker: str,
symbol: str,
loglevel: str = None,
) -> None:
async with (
data.open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started({})
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
client = PaperBoi(
broker,
ems_stream,
_buys={},
_sells={},
_reqids={},
)
n.start_soon(handle_order_requests, client, ems_stream)
# paper engine simulator clearing task
await simulate_fills(feed.stream, client)
@asynccontextmanager
async def open_paperboi(
broker: str,
symbol: str,
loglevel: str,
) -> Callable:
'''Spawn a paper engine actor and yield through access to
its context.
'''
service_name = f'paperboi.{broker}'
async with (
tractor.find_actor(service_name) as portal,
tractor.open_nursery() as tn,
):
# only spawn if no paperboi already is up
# (we likely don't need more then one proc for basic
# simulated order clearing)
if portal is None:
portal = await tn.start_actor(
service_name,
enable_modules=[__name__]
)
async with portal.open_context(
trades_dialogue,
broker=broker,
symbol=symbol,
loglevel=loglevel,
) as (ctx, first):
yield ctx, first