Get live mode correct and working

basic_orders
Tyler Goodlet 2021-01-15 19:41:03 -05:00
parent 3e959ec260
commit 5acd780eb6
3 changed files with 106 additions and 98 deletions

View File

@ -61,11 +61,6 @@ class OrderBook:
_to_ems: trio.abc.SendChannel = _to_ems _to_ems: trio.abc.SendChannel = _to_ems
_from_order_book: trio.abc.ReceiveChannel = _from_order_book _from_order_book: trio.abc.ReceiveChannel = _from_order_book
# def on_fill(self, uuid: str) -> None:
# cmd = self._sent_orders[uuid]
# log.info(f"Order executed: {cmd}")
# self._confirmed_orders[uuid] = cmd
def send( def send(
self, self,
uuid: str, uuid: str,
@ -217,10 +212,6 @@ def get_book(broker: str) -> _ExecBook:
return _books.setdefault(broker, _ExecBook(broker)) return _books.setdefault(broker, _ExecBook(broker))
# def scan_quotes(
# quotes: dict,
async def exec_loop( async def exec_loop(
ctx: tractor.Context, ctx: tractor.Context,
broker: str, broker: str,
@ -263,8 +254,8 @@ async def exec_loop(
# start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():
execs = book.orders.get((broker, sym)) execs = book.orders.pop(sym, None)
if not execs: if execs is None:
continue continue
for tick in quote.get('ticks', ()): for tick in quote.get('ticks', ()):
@ -283,30 +274,37 @@ async def exec_loop(
if pred(price): if pred(price):
# register broker id for ems id # register broker id for ems id
order_id = await client.submit_limit( reqid = await client.submit_limit(
oid=oid, # oid=oid,
symbol=sym, symbol=sym,
action=cmd['action'], action=cmd['action'],
price=round(price, 2), price=round(price, 2),
) )
# resp = book._broker2ems_ids.setdefault( book._broker2ems_ids[reqid] = oid
book._broker2ems_ids[order_id] = oid
resp = { resp = {
'resp': 'submitted', 'resp': 'dark_exec',
'name': name, 'name': name,
'ems_trigger_time_ns': time.time_ns(), 'time_ns': time.time_ns(),
# current shm array index
'index': feed.shm._last.value - 1,
'trigger_price': price, 'trigger_price': price,
'broker_reqid': reqid,
'broker': broker,
# 'condition': True,
# current shm array index - this needed?
'ohlc_index': feed.shm._last.value - 1,
} }
await ctx.send_yield(resp) # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
pred, name, cmd = execs.pop(oid) pred, name, cmd = execs.pop(oid)
log.debug(f'execs are {execs}') await ctx.send_yield(resp)
else: # condition scan loop complete
log.debug(f'execs are {execs}')
if execs:
book.orders[symbol] = execs
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
# feed teardown # feed teardown
@ -318,10 +316,11 @@ async def exec_loop(
# should probably keep the order in some kind of weird state or cancel # should probably keep the order in some kind of weird state or cancel
# it outright? # it outright?
# status='PendingSubmit', message=''), # status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404, reqId 1550: Order held while securities are located.'), # status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')], # status='PreSubmitted', message='')],
async def receive_trade_updates( async def process_broker_trades(
ctx: tractor.Context, ctx: tractor.Context,
feed: 'Feed', # noqa feed: 'Feed', # noqa
book: _ExecBook, book: _ExecBook,
@ -339,75 +338,55 @@ async def receive_trade_updates(
first = await trades_stream.__anext__() first = await trades_stream.__anext__()
# startup msg # startup msg
assert first['trade_events'] == 'started' assert first['local_trades'] == 'start'
task_status.started() task_status.started()
async for trade_event in trades_stream: async for msg in trades_stream:
event = trade_event['trade_events'] name, ev = msg['local_trades']
log.info(f'Received broker trade event:\n{pformat(ev)}')
try: # broker request id - must be normalized
order = event['order'] # into error transmission by broker backend.
except KeyError: reqid = ev['reqid']
oid = book._broker2ems_ids.get(reqid)
# Relay broker error messages # make response packet to EMS client(s)
err = event['error'] resp = {'oid': oid}
# broker request id - must be normalized if name in ('error',):
# into error transmission by broker backend. # TODO: figure out how this will interact with EMS clients
reqid = err['brid'] # for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# TODO: handle updates!
oid = book._broker2ems_ids.get(reqid)
# XXX should we make one when it's blank? # XXX should we make one when it's blank?
log.error(pformat(err['message'])) log.error(pformat(ev['message']))
else: elif name in ('status',):
log.info(f'Received broker trade event:\n{pformat(event)}')
status = event['orderStatus']['status'] status = ev['status'].lower()
reqid = order['orderId']
# TODO: handle updates! if status == 'filled':
oid = book._broker2ems_ids.get(reqid) # conditional execution is fully complete
if not ev['remaining']:
log.info(f'Execution for {oid} is complete!')
await ctx.send_yield({'resp': 'executed', 'oid': oid})
else:
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
if status in {'Cancelled'}: await ctx.send_yield(resp)
resp = {'resp': 'cancelled'}
elif status in {'Submitted'}: elif name in ('fill',):
# ack-response that order is live/submitted # proxy through the "fill" result(s)
# to the broker resp['resp'] = 'broker_filled'
resp = {'resp': 'submitted'} resp.update(ev)
# elif status in {'Executed', 'Filled'}:
elif status in {'Filled'}:
# order was filled by broker
fills = []
for fill in event['fills']:
e = fill['execution']
fills.append(
(e.time, e.price, e.shares, e.side)
)
resp = {
'resp': 'executed',
'fills': fills,
}
else: # active in EMS
# ack-response that order is live in EMS
# (aka as a client side limit)
resp = {'resp': 'active'}
# send response packet to EMS client(s)
resp['oid'] = oid
log.info(f'Fill for {oid} cleared with\n{pformat(resp)}')
await ctx.send_yield(resp) await ctx.send_yield(resp)
@tractor.stream @tractor.stream
async def stream_and_route( async def _ems_main(
ctx: tractor.Context, ctx: tractor.Context,
client_actor_name: str, client_actor_name: str,
broker: str, broker: str,
@ -440,7 +419,7 @@ async def stream_and_route(
# for paper mode we need to mock this trades response feed # for paper mode we need to mock this trades response feed
await n.start( await n.start(
receive_trade_updates, process_broker_trades,
ctx, ctx,
feed, feed,
book, book,
@ -452,32 +431,31 @@ async def stream_and_route(
action = cmd['action'] action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
sym = cmd['symbol']
if action == 'cancel': if action in ('cancel',):
# check for live-broker order # check for live-broker order
brid = book._broker2ems_ids.inverse[oid] brid = book._broker2ems_ids.inverse[oid]
if brid: if brid:
log.info("Submitting cancel for live order") log.info("Submitting cancel for live order")
await client.submit_cancel(oid=brid) await client.submit_cancel(reqid=brid)
# check for EMS active exec # check for EMS active exec
else: else:
book.orders[symbol].pop(oid, None) book.orders[symbol].pop(oid, None)
await ctx.send_yield( await ctx.send_yield(
{'action': 'cancelled', {'action': 'dark_cancelled',
'oid': oid} 'oid': oid}
) )
elif action in ('alert', 'buy', 'sell',): elif action in ('alert', 'buy', 'sell',):
sym = cmd['symbol']
trigger_price = cmd['price'] trigger_price = cmd['price']
brokers = cmd['brokers'] brokers = cmd['brokers']
broker = brokers[0] broker = brokers[0]
last = book.lasts[(broker, sym)] last = book.lasts[(broker, sym)]
# print(f'Known last is {last}')
if action in ('buy', 'sell',): if action in ('buy', 'sell',):
@ -491,17 +469,18 @@ async def stream_and_route(
# register broker id for ems id # register broker id for ems id
order_id = await client.submit_limit( order_id = await client.submit_limit(
oid=oid, oid=oid, # no ib support for this
symbol=sym, symbol=sym,
action=action, action=action,
price=round(trigger_price, 2), price=round(trigger_price, 2),
size=1,
) )
book._broker2ems_ids[order_id] = oid book._broker2ems_ids[order_id] = oid
# book.orders[symbol][oid] = None # book.orders[symbol][oid] = None
# XXX: the trades data broker response loop # XXX: the trades data broker response loop
# (``receive_trade_updates()`` above) will # (``process_broker_trades()`` above) will
# handle sending the ems side acks back to # handle sending the ems side acks back to
# the cmd sender from here # the cmd sender from here
@ -516,30 +495,52 @@ async def stream_and_route(
pred, name = mk_check(trigger_price, last) pred, name = mk_check(trigger_price, last)
# submit execution/order to EMS scanner loop # submit execution/order to EMS scanner loop
# create list of executions on first entry
book.orders.setdefault( book.orders.setdefault(
(broker, sym), {} (broker, sym), {}
)[oid] = (pred, name, cmd) )[oid] = (pred, name, cmd)
# ack-response that order is live here # ack-response that order is live here
await ctx.send_yield({ await ctx.send_yield({
'resp': 'ems_active', 'resp': 'dark_submitted',
'oid': oid 'oid': oid
}) })
# continue and wait on next order cmd # continue and wait on next order cmd
async def _ems_main( async def open_ems(
order_mode, order_mode,
broker: str, broker: str,
symbol: Symbol, symbol: Symbol,
# lines: 'LinesEditor',
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Spawn an EMS daemon and begin sending orders and receiving """Spawn an EMS daemon and begin sending orders and receiving
alerts. alerts.
This EMS tries to reduce most broker's terrible order entry apis to
a very simple protocol built on a few easy to grok and/or
"rantsy" premises:
- most users will prefer "dark mode" where orders are not submitted
to a broker until and execution condition is triggered
(aka client-side "hidden orders")
- Brokers over-complicate their apis and generally speaking hire
poor designers to create them. We're better off using creating a super
minimal, schema-simple, request-event-stream protocol to unify all the
existing piles of shit (and shocker, it'll probably just end up
looking like a decent crypto exchange's api)
- all order types can be implemented with client-side limit orders
- we aren't reinventing a wheel in this case since none of these
brokers are exposing FIX protocol; it is they doing the re-invention.
TODO: make some fancy diagrams using this:
""" """
actor = tractor.current_actor() actor = tractor.current_actor()
@ -553,7 +554,7 @@ async def _ems_main(
enable_modules=[__name__], enable_modules=[__name__],
) )
stream = await portal.run( stream = await portal.run(
stream_and_route, _ems_main,
client_actor_name=actor.name, client_actor_name=actor.name,
broker=broker, broker=broker,
symbol=symbol.key, symbol=symbol.key,
@ -564,29 +565,36 @@ async def _ems_main(
# let parent task continue # let parent task continue
task_status.started(_to_ems) task_status.started(_to_ems)
# begin the trigger-alert stream # Begin order-response streaming
# this is where we receive **back** messages # this is where we receive **back** messages
# about executions **from** the EMS actor # about executions **from** the EMS actor
async for msg in stream: async for msg in stream:
log.info(f'Received order msg: {pformat(msg)}')
# delete the line from view # delete the line from view
oid = msg['oid'] oid = msg['oid']
resp = msg['resp'] resp = msg['resp']
# response to 'action' request (buy/sell) # response to 'action' request (buy/sell)
if resp in ('ems_active', 'submitted'): if resp in ('dark_submitted', 'broker_submitted'):
log.info(f"order accepted: {msg}") log.info(f"order accepted: {msg}")
# show line label once order is live # show line label once order is live
order_mode.on_submit(oid) order_mode.on_submit(oid)
# response to 'cancel' request # resp to 'cancel' request or error condition for action request
elif resp in ('cancelled',): elif resp in ('broker_cancelled', 'dark_cancelled'):
# delete level from view # delete level from view
order_mode.on_cancel(oid) order_mode.on_cancel(oid)
log.info(f'deleting line with oid: {oid}') log.info(f'deleting line with oid: {oid}')
# response to 'action' request (buy/sell) # response to completed 'action' request for buy/sell
elif resp in ('executed',): elif resp in ('executed',):
await order_mode.on_exec(oid, msg) await order_mode.on_exec(oid, msg)
# each clearing tick is responded individually
elif resp in ('broker_filled',):
# TODO: some kinda progress system
order_mode.on_fill(oid, msg)

View File

@ -186,7 +186,7 @@ class Feed:
# the broker side must declare this key # the broker side must declare this key
# in messages, though we could probably use # in messages, though we could probably use
# more then one? # more then one?
topics=['trade_events'], topics=['local_trades'],
) )
return self._trade_stream return self._trade_stream

View File

@ -59,7 +59,7 @@ from ..log import get_logger
from ._exec import run_qtractor, current_screen from ._exec import run_qtractor, current_screen
from ._interaction import ChartView, open_order_mode from ._interaction import ChartView, open_order_mode
from .. import fsp from .. import fsp
from .._ems import _ems_main from .._ems import open_ems
log = get_logger(__name__) log = get_logger(__name__)
@ -958,8 +958,8 @@ async def _async_main(
# inside the above mngr? # inside the above mngr?
# spawn EMS actor-service # spawn EMS actor-service
to_ems_chan = await n.start( await n.start(
_ems_main, open_ems,
order_mode, order_mode,
brokername, brokername,
symbol, symbol,