Add basic client-side order entry to EMS
parent
f9d4df7378
commit
140f3231e7
|
@ -18,7 +18,7 @@
|
||||||
In suit parlance: "Execution management systems"
|
In suit parlance: "Execution management systems"
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# import time
|
import time
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import (
|
from typing import (
|
||||||
AsyncIterator, Dict, Callable, Tuple,
|
AsyncIterator, Dict, Callable, Tuple,
|
||||||
|
@ -59,19 +59,20 @@ class OrderBook:
|
||||||
_to_ems: trio.abc.SendChannel = _to_ems
|
_to_ems: trio.abc.SendChannel = _to_ems
|
||||||
_from_order_book: trio.abc.ReceiveChannel = _from_order_book
|
_from_order_book: trio.abc.ReceiveChannel = _from_order_book
|
||||||
|
|
||||||
def on_fill(self, uuid: str) -> None:
|
# def on_fill(self, uuid: str) -> None:
|
||||||
cmd = self._sent_orders[uuid]
|
# cmd = self._sent_orders[uuid]
|
||||||
log.info(f"Order executed: {cmd}")
|
# log.info(f"Order executed: {cmd}")
|
||||||
self._confirmed_orders[uuid] = cmd
|
# self._confirmed_orders[uuid] = cmd
|
||||||
|
|
||||||
def alert(
|
def send(
|
||||||
self,
|
self,
|
||||||
uuid: str,
|
uuid: str,
|
||||||
symbol: 'Symbol',
|
symbol: 'Symbol',
|
||||||
price: float
|
price: float,
|
||||||
|
action: str,
|
||||||
) -> str:
|
) -> str:
|
||||||
cmd = {
|
cmd = {
|
||||||
'msg': 'alert',
|
'msg': action,
|
||||||
'price': price,
|
'price': price,
|
||||||
'symbol': symbol.key,
|
'symbol': symbol.key,
|
||||||
'brokers': symbol.brokers,
|
'brokers': symbol.brokers,
|
||||||
|
@ -80,12 +81,6 @@ class OrderBook:
|
||||||
self._sent_orders[uuid] = cmd
|
self._sent_orders[uuid] = cmd
|
||||||
self._to_ems.send_nowait(cmd)
|
self._to_ems.send_nowait(cmd)
|
||||||
|
|
||||||
def buy(self, price: float) -> str:
|
|
||||||
...
|
|
||||||
|
|
||||||
def sell(self, price: float) -> str:
|
|
||||||
...
|
|
||||||
|
|
||||||
def cancel(self, uuid: str) -> bool:
|
def cancel(self, uuid: str) -> bool:
|
||||||
"""Cancel an order (or alert) from the EMS.
|
"""Cancel an order (or alert) from the EMS.
|
||||||
|
|
||||||
|
@ -218,11 +213,10 @@ def get_book() -> _ExecBook:
|
||||||
return _book
|
return _book
|
||||||
|
|
||||||
|
|
||||||
async def exec_orders(
|
async def exec_loop(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
broker: str,
|
broker: str,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
exec_price: float,
|
|
||||||
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> AsyncIterator[dict]:
|
) -> AsyncIterator[dict]:
|
||||||
|
|
||||||
|
@ -239,7 +233,9 @@ async def exec_orders(
|
||||||
book = get_book()
|
book = get_book()
|
||||||
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
|
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
|
||||||
|
|
||||||
task_status.started((first_quote, feed))
|
client = feed.mod.get_client_proxy(feed._brokerd_portal)
|
||||||
|
|
||||||
|
task_status.started((first_quote, feed, client))
|
||||||
|
|
||||||
# shield this field so the remote brokerd does not get cancelled
|
# shield this field so the remote brokerd does not get cancelled
|
||||||
stream = feed.stream
|
stream = feed.stream
|
||||||
|
@ -275,21 +271,24 @@ async def exec_orders(
|
||||||
# (mocking for eg. a "fill")
|
# (mocking for eg. a "fill")
|
||||||
if pred(price):
|
if pred(price):
|
||||||
|
|
||||||
cmd['name'] = name
|
resp = {
|
||||||
cmd['index'] = feed.shm._last.value - 1
|
'msg': 'executed',
|
||||||
# current shm array index
|
'name': name,
|
||||||
cmd['trigger_price'] = price
|
'time_ns': time.time_ns(),
|
||||||
cmd['msg'] = 'executed'
|
# current shm array index
|
||||||
|
'index': feed.shm._last.value - 1,
|
||||||
|
'exec_price': price,
|
||||||
|
}
|
||||||
|
|
||||||
await ctx.send_yield(cmd)
|
await ctx.send_yield(resp)
|
||||||
|
|
||||||
print(
|
print(
|
||||||
f"GOT ALERT FOR {exec_price} @ \n{tick}\n")
|
f"GOT ALERT FOR {name} @ \n{tick}\n")
|
||||||
|
|
||||||
print(f'removing pred for {oid}')
|
log.info(f'removing pred for {oid}')
|
||||||
pred, name, cmd = execs.pop(oid)
|
pred, name, cmd = execs.pop(oid)
|
||||||
|
|
||||||
print(f'execs are {execs}')
|
log.debug(f'execs are {execs}')
|
||||||
|
|
||||||
# print(f'execs scan took: {time.time() - start}')
|
# print(f'execs scan took: {time.time() - start}')
|
||||||
# feed teardown
|
# feed teardown
|
||||||
|
@ -335,7 +334,7 @@ async def stream_and_route(ctx, ui_name):
|
||||||
# destroy exec
|
# destroy exec
|
||||||
pred, name, cmd = book.orders[_active_execs[oid]].pop(oid)
|
pred, name, cmd = book.orders[_active_execs[oid]].pop(oid)
|
||||||
|
|
||||||
# ack-cmdond that order is live
|
# ack-cmd that order is live
|
||||||
await ctx.send_yield({'msg': 'cancelled', 'oid': oid})
|
await ctx.send_yield({'msg': 'cancelled', 'oid': oid})
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
@ -351,24 +350,26 @@ async def stream_and_route(ctx, ui_name):
|
||||||
|
|
||||||
if last is None: # spawn new brokerd feed task
|
if last is None: # spawn new brokerd feed task
|
||||||
|
|
||||||
quote, feed = await n.start(
|
quote, feed, client = await n.start(
|
||||||
exec_orders,
|
exec_loop,
|
||||||
ctx,
|
ctx,
|
||||||
# TODO: eventually support N-brokers
|
|
||||||
|
# TODO: eventually support N-brokers?
|
||||||
broker,
|
broker,
|
||||||
sym,
|
sym,
|
||||||
|
|
||||||
trigger_price,
|
trigger_price,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: eventually support N-brokers
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
receive_trade_updates,
|
receive_trade_updates,
|
||||||
ctx,
|
ctx,
|
||||||
# TODO: eventually support N-brokers
|
|
||||||
feed,
|
feed,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
last = book.lasts[(broker, sym)]
|
last = book.lasts[(broker, sym)]
|
||||||
|
|
||||||
print(f'Known last is {last}')
|
print(f'Known last is {last}')
|
||||||
|
|
||||||
# Auto-gen scanner predicate:
|
# Auto-gen scanner predicate:
|
||||||
|
@ -379,6 +380,11 @@ async def stream_and_route(ctx, ui_name):
|
||||||
# the user choose the predicate operator.
|
# the user choose the predicate operator.
|
||||||
pred, name = mk_check(trigger_price, last)
|
pred, name = mk_check(trigger_price, last)
|
||||||
|
|
||||||
|
# if the predicate resolves immediately send the
|
||||||
|
# execution to the broker asap
|
||||||
|
if pred(last):
|
||||||
|
# send order
|
||||||
|
print("ORDER FILLED IMMEDIATELY!?!?!?!")
|
||||||
|
|
||||||
# create list of executions on first entry
|
# create list of executions on first entry
|
||||||
book.orders.setdefault(
|
book.orders.setdefault(
|
||||||
|
@ -387,7 +393,7 @@ async def stream_and_route(ctx, ui_name):
|
||||||
# reverse lookup for cancellations
|
# reverse lookup for cancellations
|
||||||
_active_execs[oid] = (broker, sym)
|
_active_execs[oid] = (broker, sym)
|
||||||
|
|
||||||
# ack-cmdond that order is live
|
# ack-response that order is live here
|
||||||
await ctx.send_yield({
|
await ctx.send_yield({
|
||||||
'msg': 'active',
|
'msg': 'active',
|
||||||
'oid': oid
|
'oid': oid
|
||||||
|
@ -451,14 +457,15 @@ async def spawn_router_stream_alerts(
|
||||||
|
|
||||||
elif resp in ('executed',):
|
elif resp in ('executed',):
|
||||||
|
|
||||||
order_mode.lines.remove_line(uuid=oid)
|
line = order_mode.lines.remove_line(uuid=oid)
|
||||||
print(f'deleting line with oid: {oid}')
|
print(f'deleting line with oid: {oid}')
|
||||||
|
|
||||||
order_mode.arrows.add(
|
order_mode.arrows.add(
|
||||||
oid,
|
oid,
|
||||||
msg['index'],
|
msg['index'],
|
||||||
msg['price'],
|
msg['price'],
|
||||||
pointing='up' if msg['name'] == 'up' else 'down'
|
pointing='up' if msg['name'] == 'up' else 'down',
|
||||||
|
color=line.color
|
||||||
)
|
)
|
||||||
|
|
||||||
# DESKTOP NOTIFICATIONS
|
# DESKTOP NOTIFICATIONS
|
||||||
|
|
Loading…
Reference in New Issue