Use feed's trade streamin in ems

basic_orders
Tyler Goodlet 2021-01-09 10:55:36 -05:00
parent 611486627f
commit 3e7057d247
1 changed files with 27 additions and 6 deletions

View File

@ -18,7 +18,7 @@
In suit parlance: "Execution management systems" In suit parlance: "Execution management systems"
""" """
import time # import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import ( from typing import (
AsyncIterator, Dict, Callable, Tuple, AsyncIterator, Dict, Callable, Tuple,
@ -174,6 +174,9 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]:
return check_lt, 'up' return check_lt, 'up'
else:
return None, None
@dataclass @dataclass
class _ExecBook: class _ExecBook:
@ -236,7 +239,7 @@ async def exec_orders(
book = get_book() book = get_book()
book.lasts[(broker, symbol)] = first_quote[symbol]['last'] book.lasts[(broker, symbol)] = first_quote[symbol]['last']
task_status.started(first_quote) task_status.started((first_quote, feed))
# shield this field so the remote brokerd does not get cancelled # shield this field so the remote brokerd does not get cancelled
stream = feed.stream stream = feed.stream
@ -249,7 +252,7 @@ async def exec_orders(
# XXX: optimize this for speed # XXX: optimize this for speed
############################## ##############################
start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():
execs = book.orders.get((broker, sym)) execs = book.orders.get((broker, sym))
@ -288,10 +291,20 @@ async def exec_orders(
print(f'execs are {execs}') print(f'execs are {execs}')
print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
# feed teardown # feed teardown
async def receive_trade_updates(
ctx: tractor.Context,
feed: 'Feed', # noqa
) -> AsyncIterator[dict]:
# await tractor.breakpoint()
print("TRADESZ")
async for update in await feed.recv_trades_data():
log.info(update)
@tractor.stream @tractor.stream
async def stream_and_route(ctx, ui_name): async def stream_and_route(ctx, ui_name):
"""Order router (sub)actor entrypoint. """Order router (sub)actor entrypoint.
@ -338,7 +351,7 @@ async def stream_and_route(ctx, ui_name):
if last is None: # spawn new brokerd feed task if last is None: # spawn new brokerd feed task
quote = await n.start( quote, feed = await n.start(
exec_orders, exec_orders,
ctx, ctx,
# TODO: eventually support N-brokers # TODO: eventually support N-brokers
@ -346,7 +359,14 @@ async def stream_and_route(ctx, ui_name):
sym, sym,
trigger_price, trigger_price,
) )
print(f"received first quote {quote}")
n.start_soon(
receive_trade_updates,
ctx,
# TODO: eventually support N-brokers
feed,
)
last = book.lasts[(broker, sym)] last = book.lasts[(broker, sym)]
print(f'Known last is {last}') print(f'Known last is {last}')
@ -359,6 +379,7 @@ async def stream_and_route(ctx, ui_name):
# the user choose the predicate operator. # the user choose the predicate operator.
pred, name = mk_check(trigger_price, last) pred, name = mk_check(trigger_price, last)
# create list of executions on first entry # create list of executions on first entry
book.orders.setdefault( book.orders.setdefault(
(broker, sym), {})[oid] = (pred, name, cmd) (broker, sym), {})[oid] = (pred, name, cmd)