Port EMS to typed messaging + bidir streaming

This moves the entire clearing system to use typed messages using
`pydantic.BaseModel` such that the streamed request-response order
submission protocols can be explicitly viewed in terms of message
schema, flow, and sequencing. Using the explicit message formats we can
now dig into simplifying and normalizing across broker provider apis to
get the best uniformity and simplicity.

The order submission sequence is now fully async: an order request is
expected to be explicitly acked with a new message and if cancellation
is requested by the client before the ack arrives, the cancel message is
stashed and then later sent immediately on receipt of the order
submission's ack from the backend broker. Backend brokers are now
controlled using a 2-way request-response streaming dialogue which is
fully api agnostic of the clearing system's core processing; This
leverages the new bi-directional streaming apis from `tractor`.  The
clearing core (emsd) was also simplified by moving the paper engine to
it's own sub-actor and making it api-symmetric with expected `brokerd`
endpoints.

A couple of the ems status messages were changed/added:
'dark_executed' -> 'dark_triggered'
added 'alert_triggered'

More cleaning of old code to come!
ems_to_bidir_streaming
Tyler Goodlet 2021-06-08 12:14:45 -04:00
parent 0dabc6ad26
commit 6e58f31fd8
3 changed files with 653 additions and 348 deletions

View File

@ -19,34 +19,23 @@ Orders and execution client API.
"""
from contextlib import asynccontextmanager
from typing import Dict, Tuple, List
from typing import Dict
from pprint import pformat
from dataclasses import dataclass, field
import trio
import tractor
# import msgspec
from ..data._source import Symbol
from ..log import get_logger
from ._ems import _emsd_main
from .._daemon import maybe_open_emsd
from ._messages import Order, Cancel
log = get_logger(__name__)
# TODO: some kinda validation like this
# class Order(msgspec.Struct):
# action: str
# price: float
# size: float
# symbol: str
# brokers: List[str]
# oid: str
# exec_mode: str
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
@ -64,31 +53,34 @@ class OrderBook:
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
_sent_orders: Dict[str, dict] = field(default_factory=dict)
_sent_orders: Dict[str, Order] = field(default_factory=dict)
_ready_to_receive: trio.Event = trio.Event()
def send(
self,
uuid: str,
symbol: str,
brokers: List[str],
brokers: list[str],
price: float,
size: float,
action: str,
exec_mode: str,
) -> dict:
cmd = {
'action': action,
'price': price,
'size': size,
'symbol': symbol,
'brokers': brokers,
'oid': uuid,
'exec_mode': exec_mode, # dark or live
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
return cmd
msg = Order(
action=action,
price=price,
size=size,
symbol=symbol,
brokers=brokers,
oid=uuid,
exec_mode=exec_mode, # dark or live
)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg.dict())
return msg
def update(
self,
@ -98,28 +90,27 @@ class OrderBook:
cmd = self._sent_orders[uuid]
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = OrderMsg(**msg)
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg)
return cmd
def cancel(self, uuid: str) -> bool:
"""Cancel an order (or alert) from the EMS.
"""Cancel an order (or alert) in the EMS.
"""
cmd = self._sent_orders[uuid]
msg = {
'action': 'cancel',
'oid': uuid,
'symbol': cmd['symbol'],
}
self._to_ems.send_nowait(msg)
msg = Cancel(
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg.dict())
_orders: OrderBook = None
def get_orders(
emsd_uid: Tuple[str, str] = None
emsd_uid: tuple[str, str] = None
) -> OrderBook:
""""
OrderBook singleton factory per actor.
@ -139,7 +130,10 @@ def get_orders(
return _orders
# TODO: we can get rid of this relay loop once we move
# order_mode inputs to async code!
async def relay_order_cmds_from_sync_code(
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -184,7 +178,8 @@ async def relay_order_cmds_from_sync_code(
async def open_ems(
broker: str,
symbol: Symbol,
) -> None:
) -> (OrderBook, tractor.MsgStream, dict):
"""Spawn an EMS daemon and begin sending orders and receiving
alerts.
@ -234,7 +229,7 @@ async def open_ems(
# TODO: ``first`` here should be the active orders/execs
# persistent on the ems so that loca UI's can be populated.
) as (ctx, first),
) as (ctx, positions),
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
@ -246,4 +241,4 @@ async def open_ems(
trades_stream
)
yield book, trades_stream
yield book, trades_stream, positions

View File

@ -32,7 +32,12 @@ import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ._paper_engine import PaperBoi, simulate_fills
from . import _paper_engine as paper
from ._messages import (
Status, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdError, BrokerdPosition,
)
log = get_logger(__name__)
@ -106,8 +111,9 @@ class _DarkBook:
float
] = field(default_factory=dict)
# mapping of broker order ids to piker ems ids
_broker2ems_ids: dict[str, str] = field(default_factory=bidict)
# mapping of piker ems order ids to current brokerd order flow message
_ems_entries: dict[str, str] = field(default_factory=dict)
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
# XXX: this is in place to prevent accidental positions that are too
@ -117,13 +123,20 @@ class _DarkBook:
_DEFAULT_SIZE: float = 1.0
async def execute_triggers(
async def clear_dark_triggers(
# ctx: tractor.Context,
brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
symbol: str,
stream: 'tractor.ReceiveStream', # noqa
ctx: tractor.Context,
client: 'Client', # noqa
# client: 'Client', # noqa
# order_msg_stream: 'Client', # noqa
book: _DarkBook,
) -> None:
"""Core dark order trigger loop.
@ -133,7 +146,7 @@ async def execute_triggers(
"""
# this stream may eventually contain multiple symbols
# XXX: optimize this for speed!
async for quotes in stream:
async for quotes in quote_stream:
# TODO: numba all this!
@ -169,9 +182,15 @@ async def execute_triggers(
# majority of iterations will be non-matches
continue
action = cmd['action']
action: str = cmd['action']
symbol: str = cmd['symbol']
if action != 'alert':
if action == 'alert':
# nothing to do but relay a status
# message back to the requesting ems client
resp = 'alert_triggered'
else:
# executable order submission
# submit_price = price + price*percent_away
@ -181,47 +200,89 @@ async def execute_triggers(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
reqid = await client.submit_limit(
# TODO: port to BrokerdOrder message sending
msg = BrokerdOrder(
action=cmd['action'],
oid=oid,
time_ns=time.time_ns(),
# this is a brand new order request for the
# underlying broker so we set out "broker request
# id" (brid) as nothing so that the broker
# client knows that we aren't trying to modify
# an existing order.
brid=None,
# underlying broker so we set a "broker
# request id" (brid) to "nothing" so that the
# broker client knows that we aren't trying
# to modify an existing order-request.
reqid=None,
symbol=sym,
action=cmd['action'],
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg.dict())
# mark this entry as having send an order request
book._ems_entries[oid] = msg
# register broker request id to ems id
book._broker2ems_ids[reqid] = oid
resp = 'dark_triggered'
else:
# alerts have no broker request id
reqid = ''
# an internal brokerd-broker specific
# order-request id is expected to be generated
resp = {
'resp': 'dark_executed',
'time_ns': time.time_ns(),
'trigger_price': price,
# reqid = await client.submit_limit(
'cmd': cmd, # original request message
# oid=oid,
'broker_reqid': reqid,
'broker': broker,
'oid': oid, # piker order id
# # this is a brand new order request for the
# # underlying broker so we set a "broker
# # request id" (brid) to "nothing" so that the
# # broker client knows that we aren't trying
# # to modify an existing order-request.
# brid=None,
}
# symbol=sym,
# action=cmd['action'],
# price=submit_price,
# size=cmd['size'],
# )
# # register broker request id to ems id
# else:
# # alerts have no broker request id
# reqid = ''
# resp = {
# 'resp': 'dark_executed',
# 'cmd': cmd, # original request message
# 'time_ns': time.time_ns(),
# 'trigger_price': price,
# 'broker_reqid': reqid,
# 'broker': broker,
# 'oid': oid, # piker order id
# }
msg = Status(
oid=oid, # piker order id
resp=resp,
time_ns=time.time_ns(),
symbol=symbol,
trigger_price=price,
# broker_reqid=reqid,
broker_details={'name': broker},
cmd=cmd, # original request message
).dict()
# remove exec-condition from set
log.info(f'removing pred for {oid}')
execs.pop(oid)
await ctx.send_yield(resp)
# await ctx.send_yield(resp)
await ems_client_order_stream.send(msg)
else: # condition scan loop complete
log.debug(f'execs are {execs}')
@ -231,78 +292,49 @@ async def execute_triggers(
# print(f'execs scan took: {time.time() - start}')
async def exec_loop(
# async def start_clearing(
ctx: tractor.Context,
feed: 'Feed', # noqa
broker: str,
symbol: str,
_exec_mode: str,
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
# # ctx: tractor.Context,
# brokerd_order_stream: tractor.MsgStream,
# quote_stream: tractor.MsgStream,
) -> AsyncIterator[dict]:
"""Main scan loop for order execution conditions and submission
to brokers.
# # client: 'Client',
"""
global _router
# # feed: 'Feed', # noqa
# broker: str,
# symbol: str,
# _exec_mode: str,
# XXX: this should be initial price quote from target provider
first_quote = await feed.receive()
# book: _DarkBook,
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
# TODO: wrap this in a more re-usable general api
client_factory = getattr(feed.mod, 'get_client_proxy', None)
# ) -> AsyncIterator[dict]:
# """Main scan loop for order execution conditions and submission
# to brokers.
if client_factory is not None and _exec_mode != 'paper':
# """
# async with trio.open_nursery() as n:
# we have an order API for this broker
client = client_factory(feed._brokerd_portal)
# # trigger scan and exec loop
# n.start_soon(
# trigger_executions,
else:
# force paper mode
log.warning(f'Entering paper trading mode for {broker}')
# brokerd_order_stream,
# quote_stream,
client = PaperBoi(
broker,
*trio.open_memory_channel(100),
_buys={},
_sells={},
# broker,
# symbol,
# book
# # ctx,
# # client,
# )
_reqids={},
)
# for paper mode we need to mock this trades response feed
# so we pass a duck-typed feed-looking mem chan which is fed
# fill and submission events from the exec loop
feed._trade_stream = client.trade_stream
# init the trades stream
client._to_trade_stream.send_nowait({'local_trades': 'start'})
_exec_mode = 'paper'
# return control to parent task
task_status.started((first_quote, feed, client))
stream = feed.stream
async with trio.open_nursery() as n:
n.start_soon(
execute_triggers,
broker,
symbol,
stream,
ctx,
client,
book
)
if _exec_mode == 'paper':
# TODO: make this an actual broadcast channels as in:
# https://github.com/python-trio/trio/issues/987
n.start_soon(simulate_fills, stream, client)
# # # paper engine simulator task
# # if _exec_mode == 'paper':
# # # TODO: make this an actual broadcast channels as in:
# # # https://github.com/python-trio/trio/issues/987
# # n.start_soon(simulate_fills, quote_stream, client)
# TODO: lots of cases still to handle
@ -315,11 +347,17 @@ async def exec_loop(
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
async def process_broker_trades(
ctx: tractor.Context,
feed: 'Feed', # noqa
async def translate_and_relay_brokerd_events(
# ctx: tractor.Context,
broker: str,
ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
# feed: 'Feed', # noqa
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
) -> AsyncIterator[dict]:
"""Trades update loop - receive updates from broker, convert
to EMS responses, transmit to ordering client(s).
@ -339,59 +377,135 @@ async def process_broker_trades(
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
"""
broker = feed.mod.name
# broker = feed.mod.name
# TODO: make this a context
# in the paper engine case this is just a mem receive channel
async with feed.receive_trades_data() as trades_stream:
# async with feed.receive_trades_data() as brokerd_trades_stream:
first = await trades_stream.__anext__()
# first = await brokerd_trades_stream.__anext__()
# startup msg expected as first from broker backend
assert first['local_trades'] == 'start'
task_status.started()
# assert first['local_trades'] == 'start'
# task_status.started()
async for event in trades_stream:
async for brokerd_msg in brokerd_trades_stream:
name, msg = event['local_trades']
# name, msg = event['local_trades']
name = brokerd_msg['name']
log.info(f'Received broker trade event:\n{pformat(msg)}')
log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}')
if name == 'position':
msg['resp'] = 'position'
# msg['resp'] = 'position'
# relay through
await ctx.send_yield(msg)
# relay through position msgs immediately
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
continue
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = msg['reqid']
reqid = brokerd_msg['reqid']
# make response packet to EMS client(s)
oid = book._broker2ems_ids.get(reqid)
# all piker originated requests will have an ems generated oid field
oid = brokerd_msg.get(
'oid',
book._ems2brokerd_ids.inverse.get(reqid)
)
if oid is None:
# XXX: paper clearing special cases
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = msg.get('paper_info')
paper = brokerd_msg['broker_details'].get('paper_info')
if paper:
# paperboi keeps the ems id up front
oid = paper['oid']
else:
msg.get('external')
if not msg:
log.error(f"Unknown trade event {event}")
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
ext = brokerd_msg.get('external')
if ext:
log.error(f"External trade event {ext}")
continue
else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
# initial response to brokerd order request
if name == 'ack':
# register the brokerd request id (that was likely
# generated internally) with our locall ems order id for
# reverse lookup later. a BrokerdOrderAck **must** be
# sent after an order request in order to establish this
# id mapping.
book._ems2brokerd_ids[oid] = reqid
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
if entry.action == 'cancel':
# assign newly providerd broker backend request id
entry.reqid = reqid
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict())
# - the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
continue
resp = {
'resp': None, # placeholder
'oid': oid
}
# a live flow now exists
oid = entry.oid
# make response packet to EMS client(s)
# reqid = book._ems_entries.get(oid)
# # msg is for unknown emsd order id
# if oid is None:
# oid = msg['oid']
# # XXX: paper clearing special cases
# # paper engine race case: ``Client.submit_limit()`` hasn't
# # returned yet and provided an output reqid to register
# # locally, so we need to retreive the oid that was already
# # packed at submission since we already know it ahead of
# # time
# paper = msg.get('paper_info')
# if paper:
# oid = paper['oid']
# else:
# msg.get('external')
# if not msg:
# log.error(f"Unknown trade event {event}")
# continue
# resp = {
# 'resp': None, # placeholder
# 'oid': oid
# }
resp = None
broker_details = {}
if name in (
'error',
@ -405,10 +519,10 @@ async def process_broker_trades(
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
message = msg['message']
msg = BrokerdError(**brokerd_msg)
# XXX should we make one when it's blank?
log.error(pformat(message))
log.error(pformat(msg))
# TODO: getting this bs, prolly need to handle status messages
# 'Market data farm connection is OK:usfarm.nj'
@ -436,101 +550,163 @@ async def process_broker_trades(
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
status = msg['status'].lower()
msg = BrokerdStatus(**brokerd_msg)
# status = msg['status'].lower()
if status == 'filled':
if msg.status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
if not msg.remaining:
resp['resp'] = 'broker_executed'
resp = 'broker_executed'
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
else:
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict()
elif name in (
'fill',
):
msg = BrokerdFill(**brokerd_msg)
# proxy through the "fill" result(s)
resp['resp'] = 'broker_filled'
resp.update(msg)
resp = 'broker_filled'
broker_details = msg.dict()
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# respond to requesting client
await ctx.send_yield(resp)
else:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# Create and relay EMS response status message
resp = Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
)
# relay response to requesting EMS client
await ems_client_order_stream.send(resp.dict())
async def process_order_cmds(
async def process_client_order_cmds(
# ctx: tractor.Context,
client_order_stream: tractor.MsgStream, # noqa
brokerd_order_stream: tractor.MsgStream,
ctx: tractor.Context,
cmd_stream: 'tractor.ReceiveStream', # noqa
symbol: str,
feed: 'Feed', # noqa
client: 'Client', # noqa
# client: 'Client', # noqa
dark_book: _DarkBook,
) -> None:
async for cmd in cmd_stream:
# cmd: dict
async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}')
action = cmd['action']
oid = cmd['oid']
brid = dark_book._broker2ems_ids.inverse.get(oid)
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
# TODO: can't wait for this stuff to land in 3.10
# https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings
if action in ('cancel',):
# check for live-broker order
if brid:
if live_entry:
msg = BrokerdCancel(
oid=oid,
reqid=reqid or live_entry.reqid,
time_ns=time.time_ns(),
)
# send cancel to brokerd immediately!
log.info("Submitting cancel for live order")
await client.submit_cancel(reqid=brid)
# NOTE: cancel response will be relayed back in messages
# from corresponding broker
# await client.submit_cancel(reqid=reqid)
await brokerd_order_stream.send(msg.dict())
else:
# might be a cancel for order that hasn't been acked yet
# by brokerd so register a cancel for then the order
# does show up later
dark_book._ems_entries[oid] = msg
# check for EMS active exec
else:
try:
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
# TODO: move these to `tractor.MsgStream`
await ctx.send_yield({
'resp': 'dark_cancelled',
'oid': oid
})
# tell client side that we've cancelled the
# dark-trigger order
await client_order_stream.send(
Status(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
).dict()
)
except KeyError:
log.exception(f'No dark order for {symbol}?')
# TODO: 3.10 struct-pattern matching and unpacking here
elif action in ('alert', 'buy', 'sell',):
sym = cmd['symbol']
trigger_price = cmd['price']
size = cmd['size']
brokers = cmd['brokers']
exec_mode = cmd['exec_mode']
msg = Order(**cmd)
broker = brokers[0]
# sym = cmd['symbol']
# trigger_price = cmd['price']
# size = cmd['size']
# brokers = cmd['brokers']
# exec_mode = cmd['exec_mode']
sym = msg.symbol
trigger_price = msg.price
size = msg.size
exec_mode = msg.exec_mode
broker = msg.brokers[0]
if exec_mode == 'live' and action in ('buy', 'sell',):
# register broker id for ems id
order_id = await client.submit_limit(
if live_entry is not None:
# sanity check on emsd id
assert live_entry.oid == oid
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying order: {live_entry.reqid}")
# TODO: port to BrokerdOrder message sending
# register broker id for ems id
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
time_ns=time.time_ns(),
# if this is None, creates a new order
# otherwise will modify any existing one
brid=brid,
reqid=reqid,
symbol=sym,
action=action,
@ -538,25 +714,38 @@ async def process_order_cmds(
size=size,
)
if brid:
assert dark_book._broker2ems_ids[brid] == oid
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying order: {brid}")
else:
dark_book._broker2ems_ids[order_id] = oid
# send request to backend
# XXX: the trades data broker response loop
# (``process_broker_trades()`` above) will
# handle sending the ems side acks back to
# the cmd sender from here
# (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to
# the client/cmd sender from this request
print(f'sending live order {msg}')
await brokerd_order_stream.send(msg.dict())
# order_id = await client.submit_limit(
# oid=oid, # no ib support for oids...
# # if this is None, creates a new order
# # otherwise will modify any existing one
# brid=brid,
# symbol=sym,
# action=action,
# price=trigger_price,
# size=size,
# )
# an immediate response should be brokerd ack with order
# id but we register our request as part of the flow
dark_book._ems_entries[oid] = msg
elif exec_mode in ('dark', 'paper') or (
action in ('alert')
):
# submit order to local EMS
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
@ -590,8 +779,10 @@ async def process_order_cmds(
abs_diff_away = 0
# submit execution/order to EMS scan loop
# FYI: this may result in an override of an existing
# NOTE: this may result in an override of an existing
# dark book entry if the order id already exists
dark_book.orders.setdefault(
sym, {}
)[oid] = (
@ -601,14 +792,27 @@ async def process_order_cmds(
percent_away,
abs_diff_away
)
# TODO: if the predicate resolves immediately send the
# execution to the broker asap? Or no?
# ack-response that order is live in EMS
await ctx.send_yield({
'resp': 'dark_submitted',
'oid': oid
})
# await ctx.send_yield(
# {'resp': 'dark_submitted',
# 'oid': oid}
# )
if action == 'alert':
resp = 'alert_submitted'
else:
resp = 'dark_submitted'
await client_order_stream.send(
Status(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
).dict()
)
@tractor.context
@ -618,7 +822,8 @@ async def _emsd_main(
# client_actor_name: str,
broker: str,
symbol: str,
_mode: str = 'dark', # ('paper', 'dark', 'live')
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
loglevel: str = 'info',
) -> None:
"""EMS (sub)actor entrypoint providing the
@ -635,15 +840,23 @@ async def _emsd_main(
received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client.
The task tree is:
The primary ``emsd`` task tree is:
- ``_emsd_main()``:
accepts order cmds, registers execs with exec loop
- ``exec_loop()``:
sets up brokerd feed, order feed with ems client, trades dialogue with
brokderd trading api.
|
- ``start_clearing()``:
run (dark) conditions on inputs and trigger broker submissions
- ``process_broker_trades()``:
accept normalized trades responses, process and relay to ems client(s)
|
- ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
|
- ``process_client_order_cmds()``:
accepts order cmds from requesting piker clients, registers
execs with exec loop
"""
# from ._client import send_order_cmds
@ -651,49 +864,140 @@ async def _emsd_main(
global _router
dark_book = _router.get_dark_book(broker)
ems_ctx = ctx
cached_feed = _router.feeds.get((broker, symbol))
if cached_feed:
# TODO: use cached feeds per calling-actor
log.warning(f'Opening duplicate feed for {(broker, symbol)}')
# spawn one task per broker feed
async with trio.open_nursery() as n:
async with (
trio.open_nursery() as n,
# TODO: eventually support N-brokers
async with data.open_feed(
data.open_feed(
broker,
[symbol],
loglevel='info',
) as feed:
loglevel=loglevel,
) as feed,
):
if not cached_feed:
_router.feeds[(broker, symbol)] = feed
# get a portal back to the client
# async with tractor.wait_for_actor(client_actor_name) as portal:
# XXX: this should be initial price quote from target provider
first_quote = await feed.receive()
await ctx.started()
# open a stream with the brokerd backend for order
# flow dialogue
# establish 2-way stream with requesting order-client
async with ctx.open_stream() as order_stream:
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if trades_endpoint is None or _exec_mode == 'paper':
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine inside the brokerd
# actor to simulate the real load it'll likely be under
# when also pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
# for paper mode we need to mock this trades response feed
# so we pass a duck-typed feed-looking mem chan which is fed
# fill and submission events from the exec loop
# feed._trade_stream = client.trade_stream
# init the trades stream
# client._to_trade_stream.send_nowait({'local_trades': 'start'})
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# if trades_endpoint is not None and _exec_mode != 'paper':
# # TODO: open a bidir stream here?
# # we have an order API for this broker
# client = client_factory(feed._brokerd_portal)
# else:
# return control to parent task
# task_status.started((first_quote, feed, client))
# stream = feed.stream
# start the real-time clearing condition scan loop and
# paper engine simulator.
# n.start_soon(
# start_clearing,
# brokerd_trades_stream,
# feed.stream, # quote stream
# # client,
# broker,
# symbol,
# _exec_mode,
# book,
# )
# signal to client that we're started
# TODO: we could send back **all** brokerd positions here?
await ems_ctx.started(positions)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream:
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
brokerd_trades_stream,
ems_client_order_stream,
feed.stream,
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
feed,
broker,
symbol,
_mode,
book
# ctx,
# client,
)
# begin processing order events from the target brokerd backend
await n.start(
process_broker_trades,
ctx,
feed,
n.start_soon(
translate_and_relay_brokerd_events,
broker,
ems_client_order_stream,
brokerd_trades_stream,
dark_book,
)
# start inbound (from attached client) order request processing
await process_order_cmds(
ctx,
order_stream,
await process_client_order_cmds(
ems_client_order_stream,
brokerd_trades_stream,
symbol,
feed,
client,
dark_book,
)

View File

@ -127,9 +127,9 @@ class OrderMode:
"""
line = self.lines.commit_line(uuid)
req_msg = self.book._sent_orders.get(uuid)
if req_msg:
req_msg.ack_time_ns = time.time_ns()
# req_msg = self.book._sent_orders.get(uuid)
# if req_msg:
# req_msg.ack_time_ns = time.time_ns()
return line
@ -317,10 +317,14 @@ async def start_order_mode(
# spawn EMS actor-service
async with (
open_ems(brokername, symbol) as (book, trades_stream),
open_ems(brokername, symbol) as (book, trades_stream, positions),
open_order_mode(symbol, chart, book) as order_mode
):
# update any exising positions
for sym, msg in positions.items():
order_mode.on_position_update(msg)
def get_index(time: float):
# XXX: not sure why the time is so off here
@ -343,16 +347,15 @@ async def start_order_mode(
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
resp = msg['resp']
if resp in (
name = msg['name']
if name in (
'position',
):
# show line label once order is live
order_mode.on_position_update(msg)
continue
# delete the line from view
resp = msg['resp']
oid = msg['oid']
# response to 'action' request (buy/sell)
@ -375,15 +378,15 @@ async def start_order_mode(
order_mode.on_cancel(oid)
elif resp in (
'dark_executed'
'dark_triggered'
):
log.info(f'Dark order triggered for {fmsg}')
# for alerts add a triangle and remove the
# level line
if msg['cmd']['action'] == 'alert':
elif resp in (
'alert_triggered'
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
order_mode.on_fill(
oid,
price=msg['trigger_price'],
@ -400,12 +403,15 @@ async def start_order_mode(
# each clearing tick is responded individually
elif resp in ('broker_filled',):
action = msg['action']
action = book._sent_orders[oid].action
details = msg['brokerd_msg']
# TODO: some kinda progress system
order_mode.on_fill(
oid,
price=msg['price'],
arrow_index=get_index(msg['broker_time']),
price=details['price'],
pointing='up' if action == 'buy' else 'down',
# TODO: put the actual exchange timestamp
arrow_index=get_index(details['broker_time']),
)