Big refactor; start paper client

basic_orders
Tyler Goodlet 2021-01-18 19:55:50 -05:00
parent 2bf95d7ec7
commit f3ae8db04b
2 changed files with 342 additions and 246 deletions

View File

@ -20,6 +20,7 @@ In suit parlance: "Execution management systems"
"""
from pprint import pformat
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import (
AsyncIterator, Dict, Callable, Tuple,
@ -37,105 +38,6 @@ from .data._source import Symbol
log = get_logger(__name__)
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_to_ems, _from_order_book = trio.open_memory_channel(100)
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, mostly for keeping local state to match the EMS and use
received events to trigger graphics updates.
"""
_sent_orders: Dict[str, dict] = field(default_factory=dict)
# _confirmed_orders: Dict[str, dict] = field(default_factory=dict)
_to_ems: trio.abc.SendChannel = _to_ems
_from_order_book: trio.abc.ReceiveChannel = _from_order_book
def send(
self,
uuid: str,
symbol: 'Symbol',
price: float,
action: str,
) -> str:
cmd = {
'action': action,
'price': price,
'symbol': symbol.key,
'brokers': symbol.brokers,
'oid': uuid,
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
async def modify(self, oid: str, price) -> bool:
...
def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""
cmd = self._sent_orders[uuid]
msg = {
'action': 'cancel',
'oid': uuid,
'symbol': cmd['symbol'],
}
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook:
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
global _orders
if _orders is None:
_orders = OrderBook()
return _orders
# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt but could be
any other client service code). This process simply delivers order
messages to the above ``_to_ems`` send channel (from sync code using
``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using
a ``tractor`` portal.
This effectively makes order messages look like they're being
"pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI.
"""
global _from_order_book
async for cmd in _from_order_book:
# send msg over IPC / wire
log.info(f'sending order cmd: {cmd}')
yield cmd
# TODO: numba all of this
def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]:
@ -181,7 +83,6 @@ class _ExecBook:
# levels which have an executable action (eg. alert, order, signal)
orders: Dict[
# Tuple[str, str],
str, # symbol
Dict[
str, # uuid
@ -212,10 +113,48 @@ def get_book(broker: str) -> _ExecBook:
return _books.setdefault(broker, _ExecBook(broker))
@dataclass
class PaperBoi:
"""Emulates a broker order client providing the same API and
order-event response event stream format but with methods for
triggering desired events based on forward testing engine
requirements.
"""
_to_trade_stream: trio.abc.SendChannel
trade_stream: trio.abc.ReceiveChannel
async def submit_limit(
self,
oid: str, # XXX: see return value
symbol: str,
price: float,
action: str,
size: int = 100,
) -> int:
"""Place an order and return integer request id provided by client.
"""
async def submit_cancel(
self,
reqid: str,
) -> None:
# TODO: fake market simulation effects
self._to_trade_stream()
def emulate_fill(
self
) -> None:
...
async def exec_loop(
ctx: tractor.Context,
broker: str,
symbol: str,
_exec_mode: str,
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
) -> AsyncIterator[dict]:
@ -231,7 +170,27 @@ async def exec_loop(
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# TODO: wrap this in a more re-usable general api
client = feed.mod.get_client_proxy(feed._brokerd_portal)
client_factory = getattr(feed.mod, 'get_client_proxy', None)
# we have an order API for this broker
if client_factory is not None and _exec_mode != 'paper':
client = client_factory(feed._brokerd_portal)
# force paper mode
else:
log.warning(
f'No order client is yet supported for {broker}, '
'entering paper mode')
client = PaperBoi(*trio.open_memory_channel(100))
# for paper mode we need to mock this trades response feed
# so we pass a duck-typed feed-looking mem chan which is fed
# fill and submission events from the exec loop
feed._set_fake_trades_stream(client.trade_stream)
# init the trades stream
client._to_trade_stream.send_nowait({'local_trades': 'start'})
# return control to parent task
task_status.started((first_quote, feed, client))
@ -245,8 +204,7 @@ async def exec_loop(
stream = feed.stream
with stream.shield():
# this stream may eventually contain multiple
# symbols
# this stream may eventually contain multiple symbols
async for quotes in stream:
# TODO: numba all this!
@ -269,30 +227,32 @@ async def exec_loop(
for oid, (pred, name, cmd) in tuple(execs.items()):
# push trigger msg back to parent as an "alert"
# (mocking for eg. a "fill")
if pred(price):
# majority of iterations will be non-matches
if not pred(price):
continue
# register broker id for ems id
reqid = await client.submit_limit(
# oid=oid,
oid=oid,
symbol=sym,
action=cmd['action'],
price=round(price, 2),
size=1,
)
# register broker request id to ems id
book._broker2ems_ids[reqid] = oid
resp = {
'resp': 'dark_exec',
'resp': 'dark_executed',
'name': name,
'time_ns': time.time_ns(),
'trigger_price': price,
'broker_reqid': reqid,
'broker': broker,
# 'condition': True,
'oid': oid,
'cmd': cmd, # original request message
# current shm array index - this needed?
'ohlc_index': feed.shm._last.value - 1,
# 'ohlc_index': feed.shm._last.value - 1,
}
# remove exec-condition from set
@ -310,8 +270,8 @@ async def exec_loop(
# feed teardown
# XXX: right now this is very very ad-hoc to IB
# TODO: lots of cases still to handle
# XXX: right now this is very very ad-hoc to IB
# - short-sale but securities haven't been located, in this case we
# should probably keep the order in some kind of weird state or cancel
# it outright?
@ -333,7 +293,20 @@ async def process_broker_trades(
and appropriate responses relayed back to the original EMS client
actor. There is a messaging translation layer throughout.
Expected message translation(s):
broker ems
'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
Currently accepted status values from IB
{'presubmitted', 'submitted', 'cancelled'}
"""
broker = feed.mod.name
with trio.fail_after(3):
trades_stream = await feed.recv_trades_data()
first = await trades_stream.__anext__()
@ -341,16 +314,17 @@ async def process_broker_trades(
assert first['local_trades'] == 'start'
task_status.started()
async for msg in trades_stream:
name, ev = msg['local_trades']
log.info(f'Received broker trade event:\n{pformat(ev)}')
async for event in trades_stream:
# broker request id - must be normalized
# into error transmission by broker backend.
reqid = ev['reqid']
oid = book._broker2ems_ids.get(reqid)
name, msg = event['local_trades']
log.info(f'Received broker trade event:\n{pformat(msg)}')
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = msg['reqid']
# make response packet to EMS client(s)
oid = book._broker2ems_ids.get(reqid)
resp = {'oid': oid}
if name in ('error',):
@ -358,18 +332,37 @@ async def process_broker_trades(
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
message = msg['message']
# XXX should we make one when it's blank?
log.error(pformat(ev['message']))
log.error(pformat(message))
# another stupid ib error to handle
# if 10147 in message: cancel
elif name in ('status',):
status = ev['status'].lower()
# everyone doin camel case
status = msg['status'].lower()
if status == 'filled':
# conditional execution is fully complete
if not ev['remaining']:
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
await ctx.send_yield(
{'resp': 'broker_executed', 'oid': oid})
log.info(f'Execution for {oid} is complete!')
await ctx.send_yield({'resp': 'executed', 'oid': oid})
# just log it
else:
log.info(f'{broker} filled {msg}')
else:
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
@ -379,10 +372,9 @@ async def process_broker_trades(
elif name in ('fill',):
# proxy through the "fill" result(s)
resp['resp'] = 'broker_filled'
resp.update(ev)
log.info(f'Fill for {oid} cleared with\n{pformat(resp)}')
resp.update(msg)
await ctx.send_yield(resp)
log.info(f'Fill for {oid} cleared with\n{pformat(resp)}')
@tractor.stream
@ -393,31 +385,50 @@ async def _ems_main(
symbol: str,
mode: str = 'live', # ('paper', 'dark', 'live')
) -> None:
"""EMS (sub)actor entrypoint.
"""EMS (sub)actor entrypoint providing the
execution management (micro)service which conducts broker
order control on behalf of clients.
This is the daemon (child) side routine which starts an EMS
runtime per broker/feed and and begins streaming back alerts
from executions to order clients.
This is the daemon (child) side routine which starts an EMS runtime
(one per broker-feed) and and begins streaming back alerts from
broker executions/fills.
``send_order_cmds()`` is called here to execute in a task back in
the actor which started this service (spawned this actor), presuming
capabilities allow it, such that requests for EMS executions are
received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client.
The task tree is:
- ``_ems_main()``:
accepts order cmds, registers execs with exec loop
- ``exec_loop()``: run conditions on inputs and trigger executions
- ``process_broker_trades()``:
accept normalized trades responses, process and relay to ems client(s)
"""
actor = tractor.current_actor()
book = get_book(broker)
# new router entry point
# get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal:
# spawn one task per broker feed
async with trio.open_nursery() as n:
# TODO: eventually support N-brokers
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
broker,
symbol,
mode,
)
# for paper mode we need to mock this trades response feed
await n.start(
process_broker_trades,
ctx,
@ -425,6 +436,7 @@ async def _ems_main(
book,
)
# connect back to the calling actor to receive order requests
async for cmd in await portal.run(send_order_cmds):
log.info(f'{cmd} received in {actor.uid}')
@ -435,7 +447,7 @@ async def _ems_main(
if action in ('cancel',):
# check for live-broker order
brid = book._broker2ems_ids.inverse[oid]
brid = book._broker2ems_ids.inverse.get(oid)
if brid:
log.info("Submitting cancel for live order")
await client.submit_cancel(reqid=brid)
@ -443,10 +455,11 @@ async def _ems_main(
# check for EMS active exec
else:
book.orders[symbol].pop(oid, None)
await ctx.send_yield(
{'action': 'dark_cancelled',
'oid': oid}
)
await ctx.send_yield({
'resp': 'dark_cancelled',
'oid': oid
})
elif action in ('alert', 'buy', 'sell',):
@ -457,15 +470,7 @@ async def _ems_main(
last = book.lasts[(broker, sym)]
if action in ('buy', 'sell',):
# if the predicate resolves immediately send the
# execution to the broker asap
# if pred(last):
if mode == 'live':
# send order
log.warning("ORDER FILLED IMMEDIATELY!?!?!?!")
# IF SEND ORDER RIGHT AWAY CONDITION
if mode == 'live' and action in ('buy', 'sell',):
# register broker id for ems id
order_id = await client.submit_limit(
@ -477,14 +482,21 @@ async def _ems_main(
)
book._broker2ems_ids[order_id] = oid
# book.orders[symbol][oid] = None
# XXX: the trades data broker response loop
# (``process_broker_trades()`` above) will
# handle sending the ems side acks back to
# the cmd sender from here
elif mode in {'dark', 'paper'}:
elif mode in ('dark', 'paper') or action in ('alert'):
# if the predicate resolves immediately send the
# execution to the broker asap
# if pred(last):
# send order
# IF SEND ORDER RIGHT AWAY CONDITION
# submit order to local EMS
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
@ -496,7 +508,7 @@ async def _ems_main(
# submit execution/order to EMS scanner loop
book.orders.setdefault(
(broker, sym), {}
sym, {}
)[oid] = (pred, name, cmd)
# ack-response that order is live here
@ -508,11 +520,113 @@ async def _ems_main(
# continue and wait on next order cmd
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, mostly for keeping local state to match the EMS and use
received events to trigger graphics updates.
"""
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
_sent_orders: Dict[str, dict] = field(default_factory=dict)
_ready_to_receive: trio.Event = trio.Event()
def send(
self,
uuid: str,
symbol: 'Symbol',
price: float,
action: str,
) -> str:
cmd = {
'action': action,
'price': price,
'symbol': symbol.key,
'brokers': symbol.brokers,
'oid': uuid,
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
async def modify(self, oid: str, price) -> bool:
...
def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""
cmd = self._sent_orders[uuid]
msg = {
'action': 'cancel',
'oid': uuid,
'symbol': cmd['symbol'],
}
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook:
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
global _orders
if _orders is None:
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
# _to_ems, _from_order_book = trio.open_memory_channel(100)
_orders = OrderBook(*trio.open_memory_channel(100))
return _orders
# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt but could be
any other client service code). This process simply delivers order
messages to the above ``_to_ems`` send channel (from sync code using
``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using
a ``tractor`` portal.
This effectively makes order messages look like they're being
"pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI.
"""
book = get_orders()
orders_stream = book._from_order_book
# signal that ems connection is up and ready
book._ready_to_receive.set()
async for cmd in orders_stream:
# send msg over IPC / wire
log.info(f'sending order cmd: {cmd}')
yield cmd
@asynccontextmanager
async def open_ems(
order_mode,
broker: str,
symbol: Symbol,
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
# task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Spawn an EMS daemon and begin sending orders and receiving
alerts.
@ -538,11 +652,15 @@ async def open_ems(
brokers are exposing FIX protocol; it is they doing the re-invention.
TODO: make some fancy diagrams using this:
TODO: make some fancy diagrams using mermaid.io
the possible set of responses from the stream is currently:
- 'dark_submitted', 'broker_submitted'
- 'dark_cancelled', 'broker_cancelled'
- 'dark_executed', 'broker_executed'
- 'broker_filled'
"""
actor = tractor.current_actor()
subactor_name = 'emsd'
@ -553,7 +671,7 @@ async def open_ems(
subactor_name,
enable_modules=[__name__],
)
stream = await portal.run(
trades_stream = await portal.run(
_ems_main,
client_actor_name=actor.name,
broker=broker,
@ -561,40 +679,11 @@ async def open_ems(
)
async with tractor.wait_for_actor(subactor_name):
# let parent task continue
task_status.started(_to_ems)
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
# Begin order-response streaming
with trio.fail_after(3):
await book._ready_to_receive.wait()
# this is where we receive **back** messages
# about executions **from** the EMS actor
async for msg in stream:
log.info(f'Received order msg: {pformat(msg)}')
# delete the line from view
oid = msg['oid']
resp = msg['resp']
# response to 'action' request (buy/sell)
if resp in ('dark_submitted', 'broker_submitted'):
log.info(f"order accepted: {msg}")
# show line label once order is live
order_mode.on_submit(oid)
# resp to 'cancel' request or error condition for action request
elif resp in ('broker_cancelled', 'dark_cancelled'):
# delete level from view
order_mode.on_cancel(oid)
log.info(f'deleting line with oid: {oid}')
# response to completed 'action' request for buy/sell
elif resp in ('executed',):
await order_mode.on_exec(oid, msg)
# each clearing tick is responded individually
elif resp in ('broker_filled',):
# TODO: some kinda progress system
order_mode.on_fill(oid, msg)
yield book, trades_stream

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -26,10 +26,10 @@ from contextlib import asynccontextmanager
from importlib import import_module
from types import ModuleType
from typing import (
Dict, List, Any,
Sequence, AsyncIterator, Optional
Dict, Any, Sequence, AsyncIterator, Optional
)
import trio
import tractor
from ..brokers import get_brokermod
@ -165,19 +165,26 @@ class Feed:
return self._index_stream
def _set_fake_trades_stream(
self,
recv_chan: trio.abc.ReceiveChannel,
) -> None:
self._trade_stream = recv_chan
async def recv_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False):
log.warning(f"{self.mod.name} doesn't have trade data support yet :(")
# yah this is bullshitty but it worx
async def nuttin():
yield
return
return nuttin()
log.warning(
f"{self.mod.name} doesn't have trade data support yet :(")
if not self._trade_stream:
raise RuntimeError(
f'Can not stream trade data from {self.mod.name}')
# NOTE: this can be faked by setting a rx chan
# using the ``_.set_fake_trades_stream()`` method
if not self._trade_stream:
self._trade_stream = await self._brokerd_portal.run(
self.mod.stream_trades,