Split out ems daemon, client api and paper engine into new mods

basic_orders
Tyler Goodlet 2021-02-22 18:37:57 -05:00
parent a9bbc223bb
commit 948e133cae
6 changed files with 538 additions and 472 deletions

View File

@ -0,0 +1,212 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Orders and execution client API.
"""
from contextlib import asynccontextmanager
from typing import Dict, Tuple
from pprint import pformat
from dataclasses import dataclass, field
import trio
import tractor
from ..data._source import Symbol
from ..log import get_logger
from ._ems import _ems_main
log = get_logger(__name__)
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
"""
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
_sent_orders: Dict[str, dict] = field(default_factory=dict)
_ready_to_receive: trio.Event = trio.Event()
def send(
self,
uuid: str,
symbol: 'Symbol',
price: float,
size: float,
action: str,
exec_mode: str,
) -> dict:
cmd = {
'action': action,
'price': price,
'size': size,
'symbol': symbol.key,
'brokers': symbol.brokers,
'oid': uuid,
'exec_mode': exec_mode, # dark or live
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
return cmd
async def modify(self, oid: str, price) -> bool:
...
def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""
cmd = self._sent_orders[uuid]
msg = {
'action': 'cancel',
'oid': uuid,
'symbol': cmd['symbol'],
}
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
def get_orders(
emsd_uid: Tuple[str, str] = None
) -> OrderBook:
""""
OrderBook singleton factory per actor.
"""
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
global _orders
if _orders is None:
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_orders = OrderBook(*trio.open_memory_channel(100))
return _orders
# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt but could be
any other client service code). This process simply delivers order
messages to the above ``_to_ems`` send channel (from sync code using
``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using
a ``tractor`` portal.
This effectively makes order messages look like they're being
"pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI.
"""
book = get_orders()
orders_stream = book._from_order_book
# signal that ems connection is up and ready
book._ready_to_receive.set()
async for cmd in orders_stream:
# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
@asynccontextmanager
async def open_ems(
broker: str,
symbol: Symbol,
) -> None:
"""Spawn an EMS daemon and begin sending orders and receiving
alerts.
This EMS tries to reduce most broker's terrible order entry apis to
a very simple protocol built on a few easy to grok and/or
"rantsy" premises:
- most users will prefer "dark mode" where orders are not submitted
to a broker until and execution condition is triggered
(aka client-side "hidden orders")
- Brokers over-complicate their apis and generally speaking hire
poor designers to create them. We're better off using creating a super
minimal, schema-simple, request-event-stream protocol to unify all the
existing piles of shit (and shocker, it'll probably just end up
looking like a decent crypto exchange's api)
- all order types can be implemented with client-side limit orders
- we aren't reinventing a wheel in this case since none of these
brokers are exposing FIX protocol; it is they doing the re-invention.
TODO: make some fancy diagrams using mermaid.io
the possible set of responses from the stream is currently:
- 'dark_submitted', 'broker_submitted'
- 'dark_cancelled', 'broker_cancelled'
- 'dark_executed', 'broker_executed'
- 'broker_filled'
"""
actor = tractor.current_actor()
# TODO: add ``maybe_spawn_emsd()`` for this
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'emsd',
enable_modules=[
'piker.exchange._ems',
],
)
trades_stream = await portal.run(
_ems_main,
client_actor_name=actor.name,
broker=broker,
symbol=symbol.key,
)
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
with trio.fail_after(10):
await book._ready_to_receive.wait()
yield book, trades_stream

View File

@ -20,13 +20,10 @@ In da suit parlances: "Execution management systems"
"""
from pprint import pformat
import time
from datetime import datetime
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import (
AsyncIterator, Dict, Callable, Tuple,
)
import uuid
from bidict import bidict
import trio
@ -35,8 +32,8 @@ import tractor
from .. import data
from ..log import get_logger
from ..data._source import Symbol
from ..data._normalize import iterticks
from ._paper_engine import PaperBoi, simulate_fills
log = get_logger(__name__)
@ -126,293 +123,6 @@ def get_dark_book(broker: str) -> _DarkBook:
_DEFAULT_SIZE: float = 1.0
@dataclass
class PaperBoi:
"""Emulates a broker order client providing the same API and
order-event response event stream format but with methods for
triggering desired events based on forward testing engine
requirements.
"""
broker: str
_to_trade_stream: trio.abc.SendChannel
trade_stream: trio.abc.ReceiveChannel
# map of paper "live" orders which be used
# to simulate fills based on paper engine settings
_buys: bidict
_sells: bidict
_reqids: bidict
# init edge case L1 spread
last_ask: Tuple[float, float] = (float('inf'), 0) # price, size
last_bid: Tuple[float, float] = (0, 0)
async def submit_limit(
self,
oid: str, # XXX: see return value
symbol: str,
price: float,
action: str,
size: float,
) -> int:
"""Place an order and return integer request id provided by client.
"""
# the trades stream expects events in the form
# {'local_trades': (event_name, msg)}
reqid = str(uuid.uuid4())
# TODO: net latency model
# we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed
# to trigger first thus creating a lookup entry
# in the broker trades event processing loop
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'reqid': reqid,
'status': 'submitted',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper_info': {
'oid': oid,
},
}),
})
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
# if we're already a clearing price simulate an immediate fill
if (
action == 'buy' and (clear_price := self.last_ask[0]) <= price
) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price
):
await self.fake_fill(clear_price, size, action, reqid, oid)
else: # register this submissions as a paper live order
# submit order to book simulation fill loop
if action == 'buy':
orders = self._buys
elif action == 'sell':
orders = self._sells
# buys/sells: (symbol -> (price -> order))
orders.setdefault(symbol, {})[price] = (size, oid, reqid, action)
return reqid
async def submit_cancel(
self,
reqid: str,
) -> None:
# TODO: fake market simulation effects
# await self._to_trade_stream.send(
oid, symbol, action, price = self._reqids[reqid]
if action == 'buy':
self._buys[symbol].pop(price)
elif action == 'sell':
self._sells[symbol].pop(price)
# TODO: net latency model
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'oid': oid,
'reqid': reqid,
'status': 'cancelled',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper': True,
}),
})
async def fake_fill(
self,
price: float,
size: float,
action: str, # one of {'buy', 'sell'}
reqid: str,
oid: str,
# determine whether to send a filled status that has zero
# remaining lots to fill
order_complete: bool = True,
remaining: float = 0,
) -> None:
"""Pretend to fill a broker order @ price and size.
"""
# TODO: net latency model
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('fill', {
'status': 'filled',
'broker': self.broker,
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
'action': action,
'size': size,
'price': price,
'remaining': 0 if order_complete else remaining,
# normally filled by real `brokerd` daemon
'time': time.time_ns(),
'time_ns': time.time_ns(), # cuz why not
# fake ids
'reqid': reqid,
'paper_info': {
'oid': oid,
},
# XXX: fields we might not need to emulate?
# execution id from broker
# 'execid': execu.execId,
# 'cmd': cmd, # original request message?
}),
})
if order_complete:
await self._to_trade_stream.send({
'local_trades': ('status', {
'reqid': reqid,
'status': 'filled',
'broker': self.broker,
'filled': size,
'remaining': 0 if order_complete else remaining,
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
'paper_info': {
'oid': oid,
},
}),
})
async def simulate_fills(
quote_stream: 'tractor.ReceiveStream', # noqa
client: PaperBoi,
) -> None:
# TODO: more machinery to better simulate real-world market things:
# - slippage models, check what quantopian has:
# https://github.com/quantopian/zipline/blob/master/zipline/finance/slippage.py
# * this should help with simulating partial fills in a fast moving mkt
# afaiu
# - commisions models, also quantopian has em:
# https://github.com/quantopian/zipline/blob/master/zipline/finance/commission.py
# - network latency models ??
# - position tracking:
# https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
# this stream may eventually contain multiple symbols
async for quotes in quote_stream:
for sym, quote in quotes.items():
for tick in iterticks(
quote,
# dark order price filter(s)
types=('ask', 'bid', 'trade', 'last')
):
# print(tick)
tick_price = tick.get('price')
ttype = tick['type']
if ttype in ('ask',):
client.last_ask = (
tick_price,
tick.get('size', client.last_ask[1]),
)
buys = client._buys.get(sym, {})
# iterate book prices descending
for our_bid in reversed(sorted(buys.keys())):
if tick_price < our_bid:
# retreive order info
(size, oid, reqid, action) = buys.pop(our_bid)
# clearing price would have filled entirely
await client.fake_fill(
# todo slippage to determine fill price
tick_price,
size,
action,
reqid,
oid,
)
else:
# prices are interated in sorted order so
# we're done
break
if ttype in ('bid',):
client.last_bid = (
tick_price,
tick.get('size', client.last_bid[1]),
)
sells = client._sells.get(sym, {})
# iterate book prices ascending
for our_ask in sorted(sells.keys()):
if tick_price > our_ask:
# retreive order info
(size, oid, reqid, action) = sells.pop(our_ask)
# clearing price would have filled entirely
await client.fake_fill(
tick_price,
size,
action,
reqid,
oid,
)
else:
# prices are interated in sorted order so
# we're done
break
if ttype in ('trade', 'last'):
# TODO: simulate actual book queues and our orders
# place in it, might require full L2 data?
pass
async def execute_triggers(
broker: str,
symbol: str,
@ -772,6 +482,8 @@ async def _ems_main(
accept normalized trades responses, process and relay to ems client(s)
"""
from ._client import send_order_cmds
book = get_dark_book(broker)
# get a portal back to the client
@ -913,181 +625,3 @@ async def _ems_main(
})
# continue and wait on next order cmd
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
"""
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
_sent_orders: Dict[str, dict] = field(default_factory=dict)
_ready_to_receive: trio.Event = trio.Event()
def send(
self,
uuid: str,
symbol: 'Symbol',
price: float,
size: float,
action: str,
exec_mode: str,
) -> dict:
cmd = {
'action': action,
'price': price,
'size': size,
'symbol': symbol.key,
'brokers': symbol.brokers,
'oid': uuid,
'exec_mode': exec_mode, # dark or live
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
return cmd
async def modify(self, oid: str, price) -> bool:
...
def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""
cmd = self._sent_orders[uuid]
msg = {
'action': 'cancel',
'oid': uuid,
'symbol': cmd['symbol'],
}
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
def get_orders(
emsd_uid: Tuple[str, str] = None
) -> OrderBook:
""""
OrderBook singleton factory per actor.
"""
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
global _orders
if _orders is None:
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_orders = OrderBook(*trio.open_memory_channel(100))
return _orders
# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt but could be
any other client service code). This process simply delivers order
messages to the above ``_to_ems`` send channel (from sync code using
``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using
a ``tractor`` portal.
This effectively makes order messages look like they're being
"pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI.
"""
book = get_orders()
orders_stream = book._from_order_book
# signal that ems connection is up and ready
book._ready_to_receive.set()
async for cmd in orders_stream:
# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
@asynccontextmanager
async def open_ems(
broker: str,
symbol: Symbol,
# task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Spawn an EMS daemon and begin sending orders and receiving
alerts.
This EMS tries to reduce most broker's terrible order entry apis to
a very simple protocol built on a few easy to grok and/or
"rantsy" premises:
- most users will prefer "dark mode" where orders are not submitted
to a broker until and execution condition is triggered
(aka client-side "hidden orders")
- Brokers over-complicate their apis and generally speaking hire
poor designers to create them. We're better off using creating a super
minimal, schema-simple, request-event-stream protocol to unify all the
existing piles of shit (and shocker, it'll probably just end up
looking like a decent crypto exchange's api)
- all order types can be implemented with client-side limit orders
- we aren't reinventing a wheel in this case since none of these
brokers are exposing FIX protocol; it is they doing the re-invention.
TODO: make some fancy diagrams using mermaid.io
the possible set of responses from the stream is currently:
- 'dark_submitted', 'broker_submitted'
- 'dark_cancelled', 'broker_cancelled'
- 'dark_executed', 'broker_executed'
- 'broker_filled'
"""
actor = tractor.current_actor()
subactor_name = 'emsd'
# TODO: add ``maybe_spawn_emsd()`` for this
async with tractor.open_nursery() as n:
portal = await n.start_actor(
subactor_name,
enable_modules=[__name__],
)
trades_stream = await portal.run(
_ems_main,
client_actor_name=actor.name,
broker=broker,
symbol=symbol.key,
)
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
with trio.fail_after(10):
await book._ready_to_receive.wait()
yield book, trades_stream

View File

@ -0,0 +1,317 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Fake trading for forward testing.
"""
from datetime import datetime
import time
from typing import Tuple
import uuid
from bidict import bidict
import trio
from dataclasses import dataclass
from ..data._normalize import iterticks
@dataclass
class PaperBoi:
"""Emulates a broker order client providing the same API and
order-event response event stream format but with methods for
triggering desired events based on forward testing engine
requirements.
"""
broker: str
_to_trade_stream: trio.abc.SendChannel
trade_stream: trio.abc.ReceiveChannel
# map of paper "live" orders which be used
# to simulate fills based on paper engine settings
_buys: bidict
_sells: bidict
_reqids: bidict
# init edge case L1 spread
last_ask: Tuple[float, float] = (float('inf'), 0) # price, size
last_bid: Tuple[float, float] = (0, 0)
async def submit_limit(
self,
oid: str, # XXX: see return value
symbol: str,
price: float,
action: str,
size: float,
) -> int:
"""Place an order and return integer request id provided by client.
"""
# the trades stream expects events in the form
# {'local_trades': (event_name, msg)}
reqid = str(uuid.uuid4())
# TODO: net latency model
# we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed
# to trigger first thus creating a lookup entry
# in the broker trades event processing loop
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'reqid': reqid,
'status': 'submitted',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper_info': {
'oid': oid,
},
}),
})
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
# if we're already a clearing price simulate an immediate fill
if (
action == 'buy' and (clear_price := self.last_ask[0]) <= price
) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price
):
await self.fake_fill(clear_price, size, action, reqid, oid)
else: # register this submissions as a paper live order
# submit order to book simulation fill loop
if action == 'buy':
orders = self._buys
elif action == 'sell':
orders = self._sells
# buys/sells: (symbol -> (price -> order))
orders.setdefault(symbol, {})[price] = (size, oid, reqid, action)
return reqid
async def submit_cancel(
self,
reqid: str,
) -> None:
# TODO: fake market simulation effects
# await self._to_trade_stream.send(
oid, symbol, action, price = self._reqids[reqid]
if action == 'buy':
self._buys[symbol].pop(price)
elif action == 'sell':
self._sells[symbol].pop(price)
# TODO: net latency model
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('status', {
'time_ns': time.time_ns(),
'oid': oid,
'reqid': reqid,
'status': 'cancelled',
'broker': self.broker,
# 'cmd': cmd, # original request message
'paper': True,
}),
})
async def fake_fill(
self,
price: float,
size: float,
action: str, # one of {'buy', 'sell'}
reqid: str,
oid: str,
# determine whether to send a filled status that has zero
# remaining lots to fill
order_complete: bool = True,
remaining: float = 0,
) -> None:
"""Pretend to fill a broker order @ price and size.
"""
# TODO: net latency model
await trio.sleep(0.05)
await self._to_trade_stream.send({
'local_trades': ('fill', {
'status': 'filled',
'broker': self.broker,
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
'action': action,
'size': size,
'price': price,
'remaining': 0 if order_complete else remaining,
# normally filled by real `brokerd` daemon
'time': time.time_ns(),
'time_ns': time.time_ns(), # cuz why not
# fake ids
'reqid': reqid,
'paper_info': {
'oid': oid,
},
# XXX: fields we might not need to emulate?
# execution id from broker
# 'execid': execu.execId,
# 'cmd': cmd, # original request message?
}),
})
if order_complete:
await self._to_trade_stream.send({
'local_trades': ('status', {
'reqid': reqid,
'status': 'filled',
'broker': self.broker,
'filled': size,
'remaining': 0 if order_complete else remaining,
# converted to float by us in ib backend
'broker_time': datetime.now().timestamp(),
'paper_info': {
'oid': oid,
},
}),
})
async def simulate_fills(
quote_stream: 'tractor.ReceiveStream', # noqa
client: PaperBoi,
) -> None:
# TODO: more machinery to better simulate real-world market things:
# - slippage models, check what quantopian has:
# https://github.com/quantopian/zipline/blob/master/zipline/finance/slippage.py
# * this should help with simulating partial fills in a fast moving mkt
# afaiu
# - commisions models, also quantopian has em:
# https://github.com/quantopian/zipline/blob/master/zipline/finance/commission.py
# - network latency models ??
# - position tracking:
# https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
# this stream may eventually contain multiple symbols
async for quotes in quote_stream:
for sym, quote in quotes.items():
for tick in iterticks(
quote,
# dark order price filter(s)
types=('ask', 'bid', 'trade', 'last')
):
# print(tick)
tick_price = tick.get('price')
ttype = tick['type']
if ttype in ('ask',):
client.last_ask = (
tick_price,
tick.get('size', client.last_ask[1]),
)
buys = client._buys.get(sym, {})
# iterate book prices descending
for our_bid in reversed(sorted(buys.keys())):
if tick_price < our_bid:
# retreive order info
(size, oid, reqid, action) = buys.pop(our_bid)
# clearing price would have filled entirely
await client.fake_fill(
# todo slippage to determine fill price
tick_price,
size,
action,
reqid,
oid,
)
else:
# prices are interated in sorted order so
# we're done
break
if ttype in ('bid',):
client.last_bid = (
tick_price,
tick.get('size', client.last_bid[1]),
)
sells = client._sells.get(sym, {})
# iterate book prices ascending
for our_ask in sorted(sells.keys()):
if tick_price > our_ask:
# retreive order info
(size, oid, reqid, action) = sells.pop(our_ask)
# clearing price would have filled entirely
await client.fake_fill(
tick_price,
size,
action,
reqid,
oid,
)
else:
# prices are interated in sorted order so
# we're done
break
if ttype in ('trade', 'last'):
# TODO: simulate actual book queues and our orders
# place in it, might require full L2 data?
pass

View File

@ -62,7 +62,7 @@ from ..log import get_logger
from ._exec import run_qtractor, current_screen
from ._interaction import ChartView, open_order_mode
from .. import fsp
from ..exchange._ems import open_ems
from ..exchange._client import open_ems
log = get_logger(__name__)

View File

@ -33,7 +33,7 @@ import numpy as np
from ..log import get_logger
from ._style import _min_points_to_show, hcolor, _font
from ._graphics._lines import order_line, LevelLine
from ..exchange._ems import OrderBook
from ..exchange._client import OrderBook
from ..data._source import Symbol

View File

@ -16,6 +16,7 @@
"""
Console interface to UI components.
"""
from functools import partial
import os
@ -149,6 +150,8 @@ def chart(config, symbol, profile):
tractor_kwargs={
'debug_mode': True,
'loglevel': tractorloglevel,
'enable_modules': ['piker.exchange._ems'],
'enable_modules': [
'piker.exchange._client'
],
},
)