Get order cancellation working
parent
dba8457be9
commit
39e4953a6a
227
piker/_ems.py
227
piker/_ems.py
|
@ -18,12 +18,11 @@
|
|||
In suit parlance: "Execution management systems"
|
||||
|
||||
"""
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import (
|
||||
AsyncIterator, Dict, Callable, Tuple,
|
||||
Any,
|
||||
)
|
||||
# import uuid
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
|
@ -71,11 +70,6 @@ class OrderBook:
|
|||
symbol: 'Symbol',
|
||||
price: float
|
||||
) -> str:
|
||||
# XXX: should make this an explicit attr
|
||||
# it's assigned inside ``.add_plot()``
|
||||
# lc = self.view.linked_charts
|
||||
|
||||
# uid = str(uuid.uuid4())
|
||||
cmd = {
|
||||
'msg': 'alert',
|
||||
'price': price,
|
||||
|
@ -86,17 +80,22 @@ class OrderBook:
|
|||
self._sent_orders[uuid] = cmd
|
||||
self._to_ems.send_nowait(cmd)
|
||||
|
||||
async def buy(self, price: float) -> str:
|
||||
def buy(self, price: float) -> str:
|
||||
...
|
||||
|
||||
async def sell(self, price: float) -> str:
|
||||
def sell(self, price: float) -> str:
|
||||
...
|
||||
|
||||
async def cancel(self, oid: str) -> bool:
|
||||
def cancel(self, uuid: str) -> bool:
|
||||
"""Cancel an order (or alert) from the EMS.
|
||||
|
||||
"""
|
||||
...
|
||||
cmd = {
|
||||
'msg': 'cancel',
|
||||
'oid': uuid,
|
||||
}
|
||||
self._sent_orders[uuid] = cmd
|
||||
self._to_ems.send_nowait(cmd)
|
||||
|
||||
# higher level operations
|
||||
|
||||
|
@ -138,9 +137,7 @@ async def send_order_cmds():
|
|||
"pushed" from the parent to the EMS actor.
|
||||
|
||||
"""
|
||||
|
||||
global _from_order_book
|
||||
# book = get_orders()
|
||||
|
||||
async for cmd in _from_order_book:
|
||||
|
||||
|
@ -148,27 +145,6 @@ async def send_order_cmds():
|
|||
log.info(f'sending order cmd: {cmd}')
|
||||
yield cmd
|
||||
|
||||
# lc = order['chart']
|
||||
# symbol = order['symol']
|
||||
# msg = order['msg']
|
||||
# price = order['price']
|
||||
# oid = order['oid']
|
||||
|
||||
# TODO
|
||||
# oid = str(uuid.uuid4())
|
||||
|
||||
# cmd = {
|
||||
# 'price': price,
|
||||
# 'action': 'alert',
|
||||
# 'symbol': symbol.key,
|
||||
# 'brokers': symbol.brokers,
|
||||
# 'msg': msg,
|
||||
# 'price': price,
|
||||
# 'oid': oid,
|
||||
# }
|
||||
|
||||
# book._sent_orders[oid] = cmd
|
||||
|
||||
|
||||
# TODO: numba all of this
|
||||
def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]:
|
||||
|
@ -210,13 +186,13 @@ class _ExecBook:
|
|||
# levels which have an executable action (eg. alert, order, signal)
|
||||
orders: Dict[
|
||||
Tuple[str, str],
|
||||
Tuple[
|
||||
# predicates
|
||||
Callable[[float], bool],
|
||||
|
||||
# actions
|
||||
Callable[[float], Dict[str, Any]],
|
||||
|
||||
Dict[
|
||||
str, # uuid
|
||||
Tuple[
|
||||
Callable[[float], bool], # predicate
|
||||
str, # name
|
||||
dict, # cmd / msg type
|
||||
]
|
||||
]
|
||||
] = field(default_factory=dict)
|
||||
|
||||
|
@ -273,6 +249,7 @@ async def exec_orders(
|
|||
# XXX: optimize this for speed
|
||||
##############################
|
||||
|
||||
start = time.time()
|
||||
for sym, quote in quotes.items():
|
||||
|
||||
execs = book.orders.get((broker, sym))
|
||||
|
@ -289,7 +266,7 @@ async def exec_orders(
|
|||
if not execs:
|
||||
continue
|
||||
|
||||
for oid, pred, name, cmd in tuple(execs):
|
||||
for oid, (pred, name, cmd) in tuple(execs.items()):
|
||||
|
||||
# push trigger msg back to parent as an "alert"
|
||||
# (mocking for eg. a "fill")
|
||||
|
@ -299,25 +276,19 @@ async def exec_orders(
|
|||
cmd['index'] = feed.shm._last.value - 1
|
||||
# current shm array index
|
||||
cmd['trigger_price'] = price
|
||||
cmd['msg'] = 'executed'
|
||||
|
||||
await ctx.send_yield(cmd)
|
||||
# await ctx.send_yield({
|
||||
# 'type': 'alert',
|
||||
# 'price': price,
|
||||
# # current shm array index
|
||||
# 'index': feed.shm._last.value - 1,
|
||||
# 'name': name,
|
||||
# 'oid': oid,
|
||||
# })
|
||||
|
||||
print(
|
||||
f"GOT ALERT FOR {exec_price} @ \n{tick}\n")
|
||||
|
||||
print(f'removing pred for {oid}')
|
||||
execs.remove((oid, pred, name, cmd))
|
||||
pred, name, cmd = execs.pop(oid)
|
||||
|
||||
print(f'execs are {execs}')
|
||||
|
||||
print(f'execs scan took: {time.time() - start}')
|
||||
# feed teardown
|
||||
|
||||
|
||||
|
@ -333,6 +304,8 @@ async def stream_and_route(ctx, ui_name):
|
|||
actor = tractor.current_actor()
|
||||
book = get_book()
|
||||
|
||||
_active_execs: Dict[str, (str, str)] = {}
|
||||
|
||||
# new router entry point
|
||||
async with tractor.wait_for_actor(ui_name) as portal:
|
||||
|
||||
|
@ -341,52 +314,63 @@ async def stream_and_route(ctx, ui_name):
|
|||
|
||||
async for cmd in await portal.run(send_order_cmds):
|
||||
|
||||
log.info(f'{cmd} received in {actor.uid}')
|
||||
msg = cmd['msg']
|
||||
|
||||
if msg == 'cancel':
|
||||
# TODO:
|
||||
pass
|
||||
|
||||
trigger_price = cmd['price']
|
||||
sym = cmd['symbol']
|
||||
brokers = cmd['brokers']
|
||||
oid = cmd['oid']
|
||||
|
||||
if msg == 'alert':
|
||||
log.info(f'Alert {cmd} received in {actor.uid}')
|
||||
if msg == 'cancel':
|
||||
# destroy exec
|
||||
pred, name, cmd = book.orders[_active_execs[oid]].pop(oid)
|
||||
|
||||
broker = brokers[0]
|
||||
last = book.lasts.get((broker, sym))
|
||||
# ack-cmdond that order is live
|
||||
await ctx.send_yield({'msg': 'cancelled', 'oid': oid})
|
||||
|
||||
if last is None: # spawn new brokerd feed task
|
||||
continue
|
||||
|
||||
quote = await n.start(
|
||||
exec_orders,
|
||||
ctx,
|
||||
# TODO: eventually support N-brokers
|
||||
broker,
|
||||
sym,
|
||||
trigger_price,
|
||||
)
|
||||
print(f"received first quote {quote}")
|
||||
elif msg in ('alert', 'buy', 'sell',):
|
||||
|
||||
last = book.lasts[(broker, sym)]
|
||||
print(f'Known last is {last}')
|
||||
trigger_price = cmd['price']
|
||||
sym = cmd['symbol']
|
||||
brokers = cmd['brokers']
|
||||
|
||||
# Auto-gen scanner predicate:
|
||||
# we automatically figure out what the alert check condition
|
||||
# should be based on the current first price received from the
|
||||
# feed, instead of being like every other shitty tina platform
|
||||
# that makes the user choose the predicate operator.
|
||||
pred, name = mk_check(trigger_price, last)
|
||||
broker = brokers[0]
|
||||
last = book.lasts.get((broker, sym))
|
||||
|
||||
# create list of executions on first entry
|
||||
book.orders.setdefault((broker, sym), []).append(
|
||||
(oid, pred, name, cmd)
|
||||
)
|
||||
if last is None: # spawn new brokerd feed task
|
||||
|
||||
# ack-respond that order is live
|
||||
await ctx.send_yield({'msg': 'ack', 'oid': oid})
|
||||
quote = await n.start(
|
||||
exec_orders,
|
||||
ctx,
|
||||
# TODO: eventually support N-brokers
|
||||
broker,
|
||||
sym,
|
||||
trigger_price,
|
||||
)
|
||||
print(f"received first quote {quote}")
|
||||
|
||||
last = book.lasts[(broker, sym)]
|
||||
print(f'Known last is {last}')
|
||||
|
||||
# Auto-gen scanner predicate:
|
||||
# we automatically figure out what the alert check
|
||||
# condition should be based on the current first
|
||||
# price received from the feed, instead of being
|
||||
# like every other shitty tina platform that makes
|
||||
# the user choose the predicate operator.
|
||||
pred, name = mk_check(trigger_price, last)
|
||||
|
||||
# create list of executions on first entry
|
||||
book.orders.setdefault(
|
||||
(broker, sym), {})[oid] = (pred, name, cmd)
|
||||
|
||||
# reverse lookup for cancellations
|
||||
_active_execs[oid] = (broker, sym)
|
||||
|
||||
# ack-cmdond that order is live
|
||||
await ctx.send_yield({
|
||||
'msg': 'active',
|
||||
'oid': oid
|
||||
})
|
||||
|
||||
# continue and wait on next order cmd
|
||||
|
||||
|
@ -410,7 +394,7 @@ async def spawn_router_stream_alerts(
|
|||
|
||||
portal = await n.start_actor(
|
||||
subactor_name,
|
||||
rpc_module_paths=[__name__],
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
stream = await portal.run(
|
||||
stream_and_route,
|
||||
|
@ -424,51 +408,52 @@ async def spawn_router_stream_alerts(
|
|||
# begin the trigger-alert stream
|
||||
# this is where we receive **back** messages
|
||||
# about executions **from** the EMS actor
|
||||
async for alert in stream:
|
||||
async for msg in stream:
|
||||
|
||||
# delete the line from view
|
||||
oid = alert['oid']
|
||||
msg_type = alert['msg']
|
||||
oid = msg['oid']
|
||||
resp = msg['msg']
|
||||
|
||||
if msg_type == 'ack':
|
||||
print(f"order accepted: {alert}")
|
||||
if resp in ('active',):
|
||||
print(f"order accepted: {msg}")
|
||||
|
||||
# show line label once order is live
|
||||
order_mode.lines.commit_line(oid)
|
||||
|
||||
continue
|
||||
|
||||
order_mode.arrows.add(
|
||||
oid,
|
||||
alert['index'],
|
||||
alert['price'],
|
||||
pointing='up' if alert['name'] == 'up' else 'down'
|
||||
)
|
||||
elif resp in ('cancelled',):
|
||||
|
||||
# print(f'_lines: {_lines}')
|
||||
print(f'deleting line with oid: {oid}')
|
||||
# delete level from view
|
||||
order_mode.lines.remove_line(uuid=oid)
|
||||
print(f'deleting line with oid: {oid}')
|
||||
|
||||
# delete level from view
|
||||
order_mode.lines.remove_line(uuid=oid)
|
||||
elif resp in ('executed',):
|
||||
|
||||
# chart._vb._lines_editor
|
||||
# _lines.pop(oid).delete()
|
||||
order_mode.lines.remove_line(uuid=oid)
|
||||
print(f'deleting line with oid: {oid}')
|
||||
|
||||
# TODO: this in another task?
|
||||
# not sure if this will ever be a bottleneck,
|
||||
# we probably could do graphics stuff first tho?
|
||||
order_mode.arrows.add(
|
||||
oid,
|
||||
msg['index'],
|
||||
msg['price'],
|
||||
pointing='up' if msg['name'] == 'up' else 'down'
|
||||
)
|
||||
|
||||
# XXX: linux only for now
|
||||
result = await trio.run_process(
|
||||
[
|
||||
'notify-send',
|
||||
'-u', 'normal',
|
||||
'-t', '10000',
|
||||
'piker',
|
||||
f'alert: {alert}',
|
||||
],
|
||||
)
|
||||
log.runtime(result)
|
||||
# DESKTOP NOTIFICATIONS
|
||||
#
|
||||
# TODO: this in another task?
|
||||
# not sure if this will ever be a bottleneck,
|
||||
# we probably could do graphics stuff first tho?
|
||||
|
||||
# do we need this?
|
||||
# await _from_ems.put(alert)
|
||||
# XXX: linux only for now
|
||||
result = await trio.run_process(
|
||||
[
|
||||
'notify-send',
|
||||
'-u', 'normal',
|
||||
'-t', '10000',
|
||||
'piker',
|
||||
f'alert: {msg}',
|
||||
],
|
||||
)
|
||||
log.runtime(result)
|
||||
|
|
|
@ -308,6 +308,7 @@ class LineEditor:
|
|||
|
||||
"""
|
||||
line = self._order_lines[uuid]
|
||||
line.oid = uuid
|
||||
line.label.show()
|
||||
|
||||
# TODO: other flashy things to indicate the order is active
|
||||
|
@ -316,6 +317,13 @@ class LineEditor:
|
|||
|
||||
return line
|
||||
|
||||
def lines_under_cursor(self):
|
||||
"""Get the line(s) under the cursor position.
|
||||
|
||||
"""
|
||||
# Delete any hoverable under the cursor
|
||||
return self.chart._cursor._hovered
|
||||
|
||||
def remove_line(
|
||||
self,
|
||||
line: LevelLine = None,
|
||||
|
@ -328,21 +336,17 @@ class LineEditor:
|
|||
|
||||
"""
|
||||
if line:
|
||||
# If line is passed delete it
|
||||
line.delete()
|
||||
uuid = line.oid
|
||||
|
||||
elif uuid:
|
||||
# try to look up line from our registry
|
||||
self._order_lines.pop(uuid).delete()
|
||||
# try to look up line from our registry
|
||||
line = self._order_lines.pop(uuid)
|
||||
|
||||
else:
|
||||
# Delete any hoverable under the cursor
|
||||
cursor = self.chart._cursor
|
||||
# if hovered remove from cursor set
|
||||
hovered = self.chart._cursor._hovered
|
||||
if line in hovered:
|
||||
hovered.remove(line)
|
||||
|
||||
for item in cursor._hovered:
|
||||
# hovered items must also offer
|
||||
# a ``.delete()`` method
|
||||
item.delete()
|
||||
line.delete()
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -392,10 +396,15 @@ class OrderMode:
|
|||
"""Major mode for placing orders on a chart view.
|
||||
|
||||
"""
|
||||
chart: 'ChartPlotWidget'
|
||||
chart: 'ChartPlotWidget' # type: ignore # noqa
|
||||
book: OrderBook
|
||||
lines: LineEditor
|
||||
arrows: ArrowEditor
|
||||
_arrow_colors = {
|
||||
'alert': 'alert_yellow',
|
||||
'buy': 'buy_green',
|
||||
'sell': 'sell_red',
|
||||
}
|
||||
|
||||
key_map: Dict[str, Callable] = field(default_factory=dict)
|
||||
|
||||
|
@ -664,7 +673,6 @@ class ChartView(ViewBox):
|
|||
price=y
|
||||
)
|
||||
|
||||
|
||||
def keyReleaseEvent(self, ev):
|
||||
"""
|
||||
Key release to normally to trigger release of input mode
|
||||
|
@ -686,7 +694,6 @@ class ChartView(ViewBox):
|
|||
|
||||
if text == 'a':
|
||||
# draw "staged" line under cursor position
|
||||
# self._lines_editor.unstage_line()
|
||||
self.mode.lines.unstage_line()
|
||||
|
||||
def keyPressEvent(self, ev):
|
||||
|
@ -732,13 +739,14 @@ class ChartView(ViewBox):
|
|||
|
||||
elif text == 'a':
|
||||
# add a line at the current cursor
|
||||
# self._lines_editor.stage_line()
|
||||
self.mode.lines.stage_line()
|
||||
|
||||
elif text == 'd':
|
||||
|
||||
# delete any lines under the cursor
|
||||
# self._lines_editor.remove_line()
|
||||
self.mode.lines.remove_line()
|
||||
mode = self.mode
|
||||
for line in mode.lines.lines_under_cursor():
|
||||
mode.book.cancel(uuid=line.oid)
|
||||
|
||||
# XXX: Leaving this for light reference purposes, there
|
||||
# seems to be some work to at least gawk at for history mgmt.
|
||||
|
|
Loading…
Reference in New Issue