Get "live" order mode mostly workin

basic_orders
Tyler Goodlet 2021-01-14 12:59:00 -05:00
parent 1c7da2f23b
commit c835cc10e0
6 changed files with 322 additions and 153 deletions

View File

@ -18,12 +18,14 @@
In suit parlance: "Execution management systems" In suit parlance: "Execution management systems"
""" """
from pprint import pformat
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import ( from typing import (
AsyncIterator, Dict, Callable, Tuple, AsyncIterator, Dict, Callable, Tuple,
) )
from bidict import bidict
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
@ -54,7 +56,7 @@ class OrderBook:
""" """
_sent_orders: Dict[str, dict] = field(default_factory=dict) _sent_orders: Dict[str, dict] = field(default_factory=dict)
_confirmed_orders: Dict[str, dict] = field(default_factory=dict) # _confirmed_orders: Dict[str, dict] = field(default_factory=dict)
_to_ems: trio.abc.SendChannel = _to_ems _to_ems: trio.abc.SendChannel = _to_ems
_from_order_book: trio.abc.ReceiveChannel = _from_order_book _from_order_book: trio.abc.ReceiveChannel = _from_order_book
@ -72,7 +74,7 @@ class OrderBook:
action: str, action: str,
) -> str: ) -> str:
cmd = { cmd = {
'msg': action, 'action': action,
'price': price, 'price': price,
'symbol': symbol.key, 'symbol': symbol.key,
'brokers': symbol.brokers, 'brokers': symbol.brokers,
@ -81,24 +83,20 @@ class OrderBook:
self._sent_orders[uuid] = cmd self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd) self._to_ems.send_nowait(cmd)
async def modify(self, oid: str, price) -> bool:
...
def cancel(self, uuid: str) -> bool: def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS. """Cancel an order (or alert) from the EMS.
""" """
cmd = { cmd = self._sent_orders[uuid]
'msg': 'cancel', msg = {
'action': 'cancel',
'oid': uuid, 'oid': uuid,
'symbol': cmd['symbol'],
} }
self._sent_orders[uuid] = cmd self._to_ems.send_nowait(msg)
self._to_ems.send_nowait(cmd)
# higher level operations
async def transmit_to_broker(self, price: float) -> str:
...
async def modify(self, oid: str, price) -> bool:
...
_orders: OrderBook = None _orders: OrderBook = None
@ -123,13 +121,16 @@ async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI """Order streaming task: deliver orders transmitted from UI
to downstream consumers. to downstream consumers.
This is run in the UI actor (usually the one running Qt). This is run in the UI actor (usually the one running Qt but could be
The UI simply delivers order messages to the above ``_to_ems`` any other client service code). This process simply delivers order
send channel (from sync code using ``.send_nowait()``), these values messages to the above ``_to_ems`` send channel (from sync code using
are pulled from the channel here and send to any consumer(s). ``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using
a ``tractor`` portal.
This effectively makes order messages look like they're being This effectively makes order messages look like they're being
"pushed" from the parent to the EMS actor. "pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI.
""" """
global _from_order_book global _from_order_book
@ -181,9 +182,12 @@ class _ExecBook:
A singleton instance is created per EMS actor (for now). A singleton instance is created per EMS actor (for now).
""" """
broker: str
# levels which have an executable action (eg. alert, order, signal) # levels which have an executable action (eg. alert, order, signal)
orders: Dict[ orders: Dict[
Tuple[str, str], # Tuple[str, str],
str, # symbol
Dict[ Dict[
str, # uuid str, # uuid
Tuple[ Tuple[
@ -200,17 +204,21 @@ class _ExecBook:
float float
] = field(default_factory=dict) ] = field(default_factory=dict)
# mapping of broker order ids to piker ems ids
_book = None _broker2ems_ids: Dict[str, str] = field(default_factory=bidict)
def get_book() -> _ExecBook: _books: Dict[str, _ExecBook] = {}
global _book
if _book is None:
_book = _ExecBook()
return _book def get_book(broker: str) -> _ExecBook:
global _books
return _books.setdefault(broker, _ExecBook(broker))
# def scan_quotes(
# quotes: dict,
async def exec_loop( async def exec_loop(
@ -226,32 +234,38 @@ async def exec_loop(
loglevel='info', loglevel='info',
) as feed: ) as feed:
# TODO: get initial price # TODO: get initial price quote from target broker
first_quote = await feed.receive() first_quote = await feed.receive()
book = get_book(broker)
book = get_book()
book.lasts[(broker, symbol)] = first_quote[symbol]['last'] book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# TODO: wrap this in a more re-usable general api
client = feed.mod.get_client_proxy(feed._brokerd_portal) client = feed.mod.get_client_proxy(feed._brokerd_portal)
# return control to parent task
task_status.started((first_quote, feed, client)) task_status.started((first_quote, feed, client))
# shield this field so the remote brokerd does not get cancelled
stream = feed.stream
with stream.shield():
async for quotes in stream:
############################## ##############################
# begin price actions sequence # begin price actions sequence
# XXX: optimize this for speed # XXX: optimize this for speed
############################## ##############################
# shield this field so the remote brokerd does not get cancelled
stream = feed.stream
with stream.shield():
# this stream may eventually contain multiple
# symbols
async for quotes in stream:
# TODO: numba all this!
# start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():
execs = book.orders.get((broker, sym)) execs = book.orders.get((broker, sym))
if not execs:
continue
for tick in quote.get('ticks', ()): for tick in quote.get('ticks', ()):
price = tick.get('price') price = tick.get('price')
@ -262,29 +276,33 @@ async def exec_loop(
# update to keep new cmds informed # update to keep new cmds informed
book.lasts[(broker, symbol)] = price book.lasts[(broker, symbol)] = price
if not execs:
continue
for oid, (pred, name, cmd) in tuple(execs.items()): for oid, (pred, name, cmd) in tuple(execs.items()):
# push trigger msg back to parent as an "alert" # push trigger msg back to parent as an "alert"
# (mocking for eg. a "fill") # (mocking for eg. a "fill")
if pred(price): if pred(price):
# register broker id for ems id
order_id = await client.submit_limit(
oid=oid,
symbol=sym,
action=cmd['action'],
price=round(price, 2),
)
# resp = book._broker2ems_ids.setdefault(
book._broker2ems_ids[order_id] = oid
resp = { resp = {
'msg': 'executed', 'resp': 'submitted',
'name': name, 'name': name,
'time_ns': time.time_ns(), 'ems_trigger_time_ns': time.time_ns(),
# current shm array index # current shm array index
'index': feed.shm._last.value - 1, 'index': feed.shm._last.value - 1,
'exec_price': price, 'trigger_price': price,
} }
await ctx.send_yield(resp) await ctx.send_yield(resp)
print(
f"GOT ALERT FOR {name} @ \n{tick}\n")
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
pred, name, cmd = execs.pop(oid) pred, name, cmd = execs.pop(oid)
@ -294,83 +312,200 @@ async def exec_loop(
# feed teardown # feed teardown
# XXX: right now this is very very ad-hoc to IB
# TODO: lots of cases still to handle
# - short-sale but securities haven't been located, in this case we
# should probably keep the order in some kind of weird state or cancel
# it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404, reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
async def receive_trade_updates( async def receive_trade_updates(
ctx: tractor.Context, ctx: tractor.Context,
feed: 'Feed', # noqa feed: 'Feed', # noqa
book: _ExecBook,
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
) -> AsyncIterator[dict]: ) -> AsyncIterator[dict]:
# await tractor.breakpoint() """Trades update loop - receive updates from broker, convert
print("TRADESZ") to EMS responses, transmit to ordering client(s).
async for update in await feed.recv_trades_data():
log.info(update) This is where trade confirmations from the broker are processed
and appropriate responses relayed back to the original EMS client
actor. There is a messaging translation layer throughout.
"""
trades_stream = await feed.recv_trades_data()
first = await trades_stream.__anext__()
# startup msg
assert first['trade_events'] == 'started'
task_status.started()
async for trade_event in trades_stream:
event = trade_event['trade_events']
try:
order = event['order']
except KeyError:
# Relay broker error messages
err = event['error']
# broker request id - must be normalized
# into error transmission by broker backend.
reqid = err['brid']
# TODO: handle updates!
oid = book._broker2ems_ids.get(reqid)
# XXX should we make one when it's blank?
log.error(pformat(err['message']))
else:
log.info(f'Received broker trade event:\n{pformat(event)}')
status = event['orderStatus']['status']
reqid = order['orderId']
# TODO: handle updates!
oid = book._broker2ems_ids.get(reqid)
if status in {'Cancelled'}:
resp = {'resp': 'cancelled'}
elif status in {'Submitted'}:
# ack-response that order is live/submitted
# to the broker
resp = {'resp': 'submitted'}
# elif status in {'Executed', 'Filled'}:
elif status in {'Filled'}:
# order was filled by broker
fills = []
for fill in event['fills']:
e = fill['execution']
fills.append(
(e.time, e.price, e.shares, e.side)
)
resp = {
'resp': 'executed',
'fills': fills,
}
else: # active in EMS
# ack-response that order is live in EMS
# (aka as a client side limit)
resp = {'resp': 'active'}
# send response packet to EMS client(s)
resp['oid'] = oid
await ctx.send_yield(resp)
@tractor.stream @tractor.stream
async def stream_and_route(ctx, ui_name): async def stream_and_route(
"""Order router (sub)actor entrypoint. ctx: tractor.Context,
client_actor_name: str,
broker: str,
symbol: str,
mode: str = 'live', # ('paper', 'dark', 'live')
) -> None:
"""EMS (sub)actor entrypoint.
This is the daemon (child) side routine which starts an EMS This is the daemon (child) side routine which starts an EMS
runtime per broker/feed and and begins streaming back alerts runtime per broker/feed and and begins streaming back alerts
from executions back to subscribers. from executions to order clients.
""" """
actor = tractor.current_actor() actor = tractor.current_actor()
book = get_book() book = get_book(broker)
_active_execs: Dict[str, (str, str)] = {}
# new router entry point # new router entry point
async with tractor.wait_for_actor(ui_name) as portal: async with tractor.wait_for_actor(client_actor_name) as portal:
# spawn one task per broker feed # spawn one task per broker feed
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
async for cmd in await portal.run(send_order_cmds): # TODO: eventually support N-brokers
log.info(f'{cmd} received in {actor.uid}')
msg = cmd['msg']
oid = cmd['oid']
if msg == 'cancel':
# destroy exec
pred, name, cmd = book.orders[_active_execs[oid]].pop(oid)
# ack-cmd that order is live
await ctx.send_yield({'msg': 'cancelled', 'oid': oid})
continue
elif msg in ('alert', 'buy', 'sell',):
trigger_price = cmd['price']
sym = cmd['symbol']
brokers = cmd['brokers']
broker = brokers[0]
last = book.lasts.get((broker, sym))
if last is None: # spawn new brokerd feed task
quote, feed, client = await n.start( quote, feed, client = await n.start(
exec_loop, exec_loop,
ctx, ctx,
# TODO: eventually support N-brokers?
broker, broker,
sym, symbol,
trigger_price,
) )
# TODO: eventually support N-brokers # for paper mode we need to mock this trades response feed
n.start_soon( await n.start(
receive_trade_updates, receive_trade_updates,
ctx, ctx,
feed, feed,
book,
) )
last = book.lasts[(broker, sym)] async for cmd in await portal.run(send_order_cmds):
print(f'Known last is {last}') log.info(f'{cmd} received in {actor.uid}')
action = cmd['action']
oid = cmd['oid']
sym = cmd['symbol']
if action == 'cancel':
# check for live-broker order
brid = book._broker2ems_ids.inverse[oid]
if brid:
log.info("Submitting cancel for live order")
await client.submit_cancel(oid=brid)
# check for EMS active exec
else:
book.orders[symbol].pop(oid, None)
await ctx.send_yield(
{'action': 'cancelled',
'oid': oid}
)
elif action in ('alert', 'buy', 'sell',):
trigger_price = cmd['price']
brokers = cmd['brokers']
broker = brokers[0]
last = book.lasts[(broker, sym)]
# print(f'Known last is {last}')
if action in ('buy', 'sell',):
# if the predicate resolves immediately send the
# execution to the broker asap
# if pred(last):
if mode == 'live':
# send order
log.warning("ORDER FILLED IMMEDIATELY!?!?!?!")
# IF SEND ORDER RIGHT AWAY CONDITION
# register broker id for ems id
order_id = await client.submit_limit(
oid=oid,
symbol=sym,
action=action,
price=round(trigger_price, 2),
)
book._broker2ems_ids[order_id] = oid
# book.orders[symbol][oid] = None
# XXX: the trades data broker response loop
# (``receive_trade_updates()`` above) will
# handle sending the ems side acks back to
# the cmd sender from here
elif mode in {'dark', 'paper'}:
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
# we automatically figure out what the alert check # we automatically figure out what the alert check
@ -380,30 +515,24 @@ async def stream_and_route(ctx, ui_name):
# the user choose the predicate operator. # the user choose the predicate operator.
pred, name = mk_check(trigger_price, last) pred, name = mk_check(trigger_price, last)
# if the predicate resolves immediately send the # submit execution/order to EMS scanner loop
# execution to the broker asap
if pred(last):
# send order
print("ORDER FILLED IMMEDIATELY!?!?!?!")
# create list of executions on first entry # create list of executions on first entry
book.orders.setdefault( book.orders.setdefault(
(broker, sym), {})[oid] = (pred, name, cmd) (broker, sym), {}
)[oid] = (pred, name, cmd)
# reverse lookup for cancellations
_active_execs[oid] = (broker, sym)
# ack-response that order is live here # ack-response that order is live here
await ctx.send_yield({ await ctx.send_yield({
'msg': 'active', 'resp': 'ems_active',
'oid': oid 'oid': oid
}) })
# continue and wait on next order cmd # continue and wait on next order cmd
async def spawn_router_stream_alerts( async def _ems_main(
order_mode, order_mode,
broker: str,
symbol: Symbol, symbol: Symbol,
# lines: 'LinesEditor', # lines: 'LinesEditor',
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
@ -425,7 +554,10 @@ async def spawn_router_stream_alerts(
) )
stream = await portal.run( stream = await portal.run(
stream_and_route, stream_and_route,
ui_name=actor.name client_actor_name=actor.name,
broker=broker,
symbol=symbol.key,
) )
async with tractor.wait_for_actor(subactor_name): async with tractor.wait_for_actor(subactor_name):
@ -439,49 +571,22 @@ async def spawn_router_stream_alerts(
# delete the line from view # delete the line from view
oid = msg['oid'] oid = msg['oid']
resp = msg['msg'] resp = msg['resp']
if resp in ('active',): # response to 'action' request (buy/sell)
print(f"order accepted: {msg}") if resp in ('ems_active', 'submitted'):
log.info(f"order accepted: {msg}")
# show line label once order is live # show line label once order is live
order_mode.lines.commit_line(oid) order_mode.on_submit(oid)
continue
# response to 'cancel' request
elif resp in ('cancelled',): elif resp in ('cancelled',):
# delete level from view # delete level from view
order_mode.lines.remove_line(uuid=oid) order_mode.on_cancel(oid)
print(f'deleting line with oid: {oid}') log.info(f'deleting line with oid: {oid}')
# response to 'action' request (buy/sell)
elif resp in ('executed',): elif resp in ('executed',):
await order_mode.on_exec(oid, msg)
line = order_mode.lines.remove_line(uuid=oid)
print(f'deleting line with oid: {oid}')
order_mode.arrows.add(
oid,
msg['index'],
msg['price'],
pointing='up' if msg['name'] == 'up' else 'down',
color=line.color
)
# DESKTOP NOTIFICATIONS
#
# TODO: this in another task?
# not sure if this will ever be a bottleneck,
# we probably could do graphics stuff first tho?
# XXX: linux only for now
result = await trio.run_process(
[
'notify-send',
'-u', 'normal',
'-t', '10000',
'piker',
f'alert: {msg}',
],
)
log.runtime(result)

View File

@ -119,6 +119,8 @@ class NonShittyWrapper(Wrapper):
""" """
Get rid of datetime on executions. Get rid of datetime on executions.
""" """
# this is the IB server's execution time supposedly
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Execution.html#a2e05cace0aa52d809654c7248e052ef2
execu.time = execu.time.timestamp() execu.time = execu.time.timestamp()
return super().execDetails(reqId, contract, execu) return super().execDetails(reqId, contract, execu)

View File

@ -89,7 +89,6 @@ async def maybe_spawn_brokerd(
brokername: str, brokername: str,
sleep: float = 0.5, sleep: float = 0.5,
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
expose_mods: List = [],
**tractor_kwargs, **tractor_kwargs,
) -> tractor._portal.Portal: ) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found, """If no ``brokerd.{brokername}`` daemon-actor can be found,
@ -180,8 +179,14 @@ class Feed:
if not self._trade_stream: if not self._trade_stream:
self._trade_stream = await self._brokerd_portal.run( self._trade_stream = await self._brokerd_portal.run(
self.mod.stream_trades, self.mod.stream_trades,
topics=['all'], # do we need this?
# do we need this? -> yes
# the broker side must declare this key
# in messages, though we could probably use
# more then one?
topics=['trade_events'],
) )
return self._trade_stream return self._trade_stream

View File

@ -59,7 +59,7 @@ from ..log import get_logger
from ._exec import run_qtractor, current_screen from ._exec import run_qtractor, current_screen
from ._interaction import ChartView, open_order_mode from ._interaction import ChartView, open_order_mode
from .. import fsp from .. import fsp
from .._ems import spawn_router_stream_alerts from .._ems import _ems_main
log = get_logger(__name__) log = get_logger(__name__)
@ -959,8 +959,9 @@ async def _async_main(
# spawn EMS actor-service # spawn EMS actor-service
to_ems_chan = await n.start( to_ems_chan = await n.start(
spawn_router_stream_alerts, _ems_main,
order_mode, order_mode,
brokername,
symbol, symbol,
) )

View File

@ -202,6 +202,9 @@ class L1Labels:
self.ask_label._size_br_from_str(self.max_value) self.ask_label._size_br_from_str(self.max_value)
# TODO: probably worth investigating if we can
# make .boundingRect() faster:
# https://stackoverflow.com/questions/26156486/determine-bounding-rect-of-line-in-qt
class LevelLine(pg.InfiniteLine): class LevelLine(pg.InfiniteLine):
# TODO: fill in these slots for orders # TODO: fill in these slots for orders

View File

@ -17,8 +17,10 @@
""" """
UX interaction customs. UX interaction customs.
""" """
import time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pprint import pformat
from typing import Optional, Dict, Callable from typing import Optional, Dict, Callable
import uuid import uuid
@ -427,6 +429,57 @@ class OrderMode:
self._action = name self._action = name
self.lines.stage_line(color=self._colors[name]) self.lines.stage_line(color=self._colors[name])
def on_submit(self, uuid: str) -> dict:
self.lines.commit_line(uuid)
req_msg = self.book._sent_orders.get(uuid)
req_msg['ack_time_ns'] = time.time_ns()
# self.book._confirmed_orders[uuid] = req_msg
return req_msg
async def on_exec(
self,
uuid: str,
msg: Dict[str, str],
) -> None:
line = self.lines.remove_line(uuid=uuid)
log.debug(f'deleting line with oid: {uuid}')
for fill in msg['fills']:
self.arrows.add(
uuid,
msg['index'],
msg['price'],
pointing='up' if msg['action'] == 'buy' else 'down',
color=line.color
)
# DESKTOP NOTIFICATIONS
#
# TODO: this in another task?
# not sure if this will ever be a bottleneck,
# we probably could do graphics stuff first tho?
# XXX: linux only for now
result = await trio.run_process(
[
'notify-send',
'-u', 'normal',
'-t', '10000',
'piker',
f'alert: {msg}',
],
)
log.runtime(result)
def on_cancel(self, uuid: str) -> None:
msg = self.book._sent_orders.pop(uuid, None)
if msg is not None:
self.lines.remove_line(uuid=uuid)
else:
log.warning(f'Received cancel for unsubmitted order {pformat(msg)}')
def submit_exec(self) -> None: def submit_exec(self) -> None:
"""Send execution order to EMS. """Send execution order to EMS.