From 0dabc6ad267834fba220dd0998af21455bde788a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 12:06:47 -0400 Subject: [PATCH] Port paper engine to new msgs and run in sub-actor This makes the paper engine look IPC-wise exactly like any broker-provider backend module and uses the new ``trades_dialogue()`` 2-way streaming endpoint for commanding order requests. This serves as a first step toward truly distributed forward testing since the paper engine can now be run out-of tree from `pikerd` if needed thus demonstrating how real-time clearing signals can be shared between fully distinct services. --- piker/clearing/_paper_engine.py | 290 +++++++++++++++++++++++--------- 1 file changed, 212 insertions(+), 78 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 740345f5..e669fd42 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -18,17 +18,28 @@ Fake trading for forward testing. """ +from contextlib import asynccontextmanager from datetime import datetime from operator import itemgetter import time -from typing import Tuple, Optional +from typing import Tuple, Optional, Callable import uuid from bidict import bidict import trio +import tractor from dataclasses import dataclass +from .. import data from ..data._normalize import iterticks +from ..log import get_logger +from ._messages import ( + BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdFill, +) + + +log = get_logger(__name__) @dataclass @@ -41,8 +52,8 @@ class PaperBoi: """ broker: str - _to_trade_stream: trio.abc.SendChannel - trade_stream: trio.abc.ReceiveChannel + + ems_trades_stream: tractor.MsgStream # map of paper "live" orders which be used # to simulate fills based on paper engine settings @@ -61,20 +72,20 @@ class PaperBoi: price: float, action: str, size: float, - brid: Optional[str], + reqid: Optional[str], ) -> int: """Place an order and return integer request id provided by client. """ - - if brid is None: + is_modify: bool = False + if reqid is None: reqid = str(uuid.uuid4()) else: # order is already existing, this is a modify - (oid, symbol, action, old_price) = self._reqids[brid] + (oid, symbol, action, old_price) = self._reqids[reqid] assert old_price != price - reqid = brid + is_modify = True # register order internally self._reqids[reqid] = (oid, symbol, action, price) @@ -90,22 +101,16 @@ class PaperBoi: # in the broker trades event processing loop await trio.sleep(0.05) - await self._to_trade_stream.send({ - - 'local_trades': ('status', { - - 'time_ns': time.time_ns(), - 'reqid': reqid, - - 'status': 'submitted', - 'broker': self.broker, - # 'cmd': cmd, # original request message - - 'paper_info': { - 'oid': oid, - }, - }), - }) + msg = BrokerdStatus( + status='submitted', + reqid=reqid, + broker=self.broker, + time_ns=time.time_ns(), + filled=0.0, + reason='paper_trigger', + remaining=size, + ) + await self.ems_trades_stream.send(msg.dict()) # if we're already a clearing price simulate an immediate fill if ( @@ -129,7 +134,7 @@ class PaperBoi: # and trigger by the simulated clearing task normally # running ``simulate_fills()``. - if brid is not None: + if is_modify: # remove any existing order for the old price orders[symbol].pop((oid, old_price)) @@ -144,7 +149,6 @@ class PaperBoi: ) -> None: # TODO: fake market simulation effects - # await self._to_trade_stream.send( oid, symbol, action, price = self._reqids[reqid] if action == 'buy': @@ -155,21 +159,14 @@ class PaperBoi: # TODO: net latency model await trio.sleep(0.05) - await self._to_trade_stream.send({ - - 'local_trades': ('status', { - - 'time_ns': time.time_ns(), - 'oid': oid, - 'reqid': reqid, - - 'status': 'cancelled', - 'broker': self.broker, - # 'cmd': cmd, # original request message - - 'paper': True, - }), - }) + msg = BrokerdStatus( + status='cancelled', + oid=oid, + reqid=reqid, + broker=self.broker, + time_ns=time.time_ns(), + ) + await self.ems_trades_stream.send(msg.dict()) async def fake_fill( self, @@ -191,56 +188,51 @@ class PaperBoi: # TODO: net latency model await trio.sleep(0.05) - # the trades stream expects events in the form - # {'local_trades': (event_name, msg)} - await self._to_trade_stream.send({ + msg = BrokerdFill( - 'local_trades': ('fill', { + reqid=reqid, + time_ns=time.time_ns(), - 'status': 'filled', - 'broker': self.broker, - # converted to float by us in ib backend - 'broker_time': datetime.now().timestamp(), - - 'action': action, - 'size': size, - 'price': price, - 'remaining': 0 if order_complete else remaining, - - # normally filled by real `brokerd` daemon - 'time': time.time_ns(), - 'time_ns': time.time_ns(), # cuz why not - - # fake ids - 'reqid': reqid, + action=action, + size=size, + price=price, + broker_time=datetime.now().timestamp(), + broker_details={ 'paper_info': { 'oid': oid, }, + # mocking ib + 'name': self.broker + '_paper', + }, + ) + await self.ems_trades_stream.send(msg.dict()) - # XXX: fields we might not need to emulate? - # execution id from broker - # 'execid': execu.execId, - # 'cmd': cmd, # original request message? - }), - }) if order_complete: - await self._to_trade_stream.send({ - 'local_trades': ('status', { - 'reqid': reqid, - 'status': 'filled', - 'broker': self.broker, - 'filled': size, - 'remaining': 0 if order_complete else remaining, + msg = BrokerdStatus( - # converted to float by us in ib backend - 'broker_time': datetime.now().timestamp(), + reqid=reqid, + time_ns=time.time_ns(), + + status='filled', + # broker=self.broker, + filled=size, + remaining=0 if order_complete else remaining, + + action=action, + size=size, + price=price, + + # broker=self.broker, + broker_details={ 'paper_info': { 'oid': oid, }, - }), - }) + 'name': self.broker, + }, + ) + await self.ems_trades_stream.send(msg.dict()) async def simulate_fills( @@ -327,3 +319,145 @@ async def simulate_fills( else: # prices are iterated in sorted order so we're done break + + +# class MockBrokerdMsgStream: + + +# async def MockContext(*args, **kwargs): + + +async def handle_order_requests( + + client: PaperBoi, + ems_order_stream: tractor.MsgStream, + +) -> None: + + # order_request: dict + async for request_msg in ems_order_stream: + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = await client.submit_limit( + + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=order.reqid, + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await client.submit_cancel( + reqid=msg.reqid + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + + ctx: tractor.Context, + broker: str, + symbol: str, + loglevel: str = None, + +) -> None: + + async with ( + + data.open_feed( + broker, + [symbol], + loglevel=loglevel, + ) as feed, + + ): + # TODO: load paper positions per broker from .toml config file + # and pass as symbol to position data mapping: ``dict[str, dict]`` + # await ctx.started(all_positions) + await ctx.started({}) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + + client = PaperBoi( + broker, + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + ) + + n.start_soon(handle_order_requests, client, ems_stream) + + # paper engine simulator clearing task + await simulate_fills(feed.stream, client) + + +@asynccontextmanager +async def open_paperboi( + broker: str, + symbol: str, + loglevel: str, + +) -> Callable: + '''Spawn a paper engine actor and yield through access to + its context. + + ''' + service_name = f'paperboi.{broker}' + + async with ( + tractor.find_actor(service_name) as portal, + tractor.open_nursery() as tn, + ): + # only spawn if no paperboi already is up + # (we likely don't need more then one proc for basic + # simulated order clearing) + if portal is None: + portal = await tn.start_actor( + service_name, + enable_modules=[__name__] + ) + + async with portal.open_context( + trades_dialogue, + broker=broker, + symbol=symbol, + loglevel=loglevel, + + ) as (ctx, first): + yield ctx, first