Merge pull request #402 from pikers/multi_client_order_mgt

Multi client order mgt
ci_fix_tractor_testing
goodboy 2022-10-05 01:46:09 -04:00 committed by GitHub
commit 31b0d8cee8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 249 additions and 151 deletions

View File

@ -1015,11 +1015,10 @@ async def handle_order_updates(
fill_msg = BrokerdFill( fill_msg = BrokerdFill(
time_ns=time.time_ns(), time_ns=time.time_ns(),
reqid=reqid, reqid=reqid,
# just use size value for now?
# action=action, # just use size value # action=action,
# for now? size=float(vlm),
size=vlm, price=float(price),
price=price,
# TODO: maybe capture more msg data # TODO: maybe capture more msg data
# i.e fees? # i.e fees?

View File

@ -61,6 +61,7 @@ class Pair(Struct):
quote: str # asset id of quote component quote: str # asset id of quote component
lot: str # volume lot size lot: str # volume lot size
cost_decimals: int
pair_decimals: int # scaling decimal places for pair pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume lot_decimals: int # scaling decimal places for volume
@ -342,8 +343,8 @@ async def stream_quotes(
# transform to upper since piker style is always lower # transform to upper since piker style is always lower
sym = sym.upper() sym = sym.upper()
sym_info = await client.symbol_info(sym)
si = Pair(**await client.symbol_info(sym)) # validation si = Pair(**sym_info) # validation
syminfo = si.to_dict() syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals

View File

@ -18,7 +18,11 @@
In da suit parlances: "Execution management systems" In da suit parlances: "Execution management systems"
""" """
from collections import defaultdict, ChainMap from __future__ import annotations
from collections import (
defaultdict,
# ChainMap,
)
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
@ -134,12 +138,6 @@ class _DarkBook(Struct):
# _ems_entries: dict[str, str] = {} # _ems_entries: dict[str, str] = {}
_active: dict = {} _active: dict = {}
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict() _ems2brokerd_ids: dict[str, str] = bidict()
@ -152,8 +150,8 @@ _DEFAULT_SIZE: float = 1.0
async def clear_dark_triggers( async def clear_dark_triggers(
router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str, broker: str,
fqsn: str, fqsn: str,
@ -288,15 +286,10 @@ async def clear_dark_triggers(
book._active[oid] = status book._active[oid] = status
# send response to client-side # send response to client-side
try: await router.client_broadcast(
await ems_client_order_stream.send(status) fqsn,
except ( status,
trio.ClosedResourceError,
):
log.warning(
f'{ems_client_order_stream} stream broke?'
) )
break
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
@ -310,7 +303,7 @@ class TradesRelay(Struct):
# for now we keep only a single connection open with # for now we keep only a single connection open with
# each ``brokerd`` for simplicity. # each ``brokerd`` for simplicity.
brokerd_dialogue: tractor.MsgStream brokerd_stream: tractor.MsgStream
# map of symbols to dicts of accounts to pp msgs # map of symbols to dicts of accounts to pp msgs
positions: dict[ positions: dict[
@ -342,13 +335,31 @@ class Router(Struct):
# order id to client stream map # order id to client stream map
clients: set[tractor.MsgStream] = set() clients: set[tractor.MsgStream] = set()
dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors # sets of clients mapped from subscription keys
relays: dict[str, TradesRelay] = {} subscribers: defaultdict[
str, # sub key, default fqsn
set[tractor.MsgStream], # unique client streams
] = defaultdict(set)
# sets of clients dynamically registered for specific
# order flows based on subscription config.
dialogs: defaultdict[
str, # ems uuid (oid)
set[tractor.MsgStream] # client side msg stream
] = defaultdict(set)
# TODO: mapping of ems dialog ids to msg flow history
# msgflows: defaultdict[
# str,
# ChainMap[dict[str, dict]],
# ] = defaultdict(ChainMap)
# brokername to trades-dialogs streams with ``brokerd`` actors
relays: dict[
str, # broker name
TradesRelay,
] = {}
def get_dark_book( def get_dark_book(
self, self,
@ -358,6 +369,20 @@ class Router(Struct):
return self.books.setdefault(brokername, _DarkBook(brokername)) return self.books.setdefault(brokername, _DarkBook(brokername))
def get_subs(
self,
oid: str,
) -> set[tractor.MsgStream]:
'''
Deliver list of non-closed subscriber client msg streams.
'''
return set(
stream for stream in self.dialogs[oid]
if not stream._closed
)
@asynccontextmanager @asynccontextmanager
async def maybe_open_brokerd_trades_dialogue( async def maybe_open_brokerd_trades_dialogue(
self, self,
@ -373,7 +398,8 @@ class Router(Struct):
none already exists. none already exists.
''' '''
relay: TradesRelay = self.relays.get(feed.mod.name) broker = feed.mod.name
relay: TradesRelay = self.relays.get(broker)
if ( if (
relay is None relay is None
@ -387,7 +413,7 @@ class Router(Struct):
): ):
relay = await self.nursery.start( relay = await self.nursery.start(
open_brokerd_trades_dialogue, open_brokerd_trades_dialog,
self, self,
feed, feed,
symbol, symbol,
@ -395,41 +421,53 @@ class Router(Struct):
loglevel, loglevel,
) )
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
relay.consumers += 1 relay.consumers += 1
# TODO: get updated positions here? # TODO: get updated positions here?
assert relay.brokerd_dialogue assert relay.brokerd_stream
try: try:
yield relay yield relay
finally: finally:
# TODO: what exactly needs to be torn down here or # TODO: what exactly needs to be torn down here or
# are we just consumer tracking? # are we just consumer tracking?
relay.consumers -= 1 relay.consumers -= 1
async def client_broadcast( async def client_broadcast(
self, self,
sub_key: str,
msg: dict, msg: dict,
) -> None: ) -> None:
for client_stream in self.clients.copy(): to_remove: set[tractor.MsgStream] = set()
subs = self.subscribers[sub_key]
for client_stream in subs:
try: try:
await client_stream.send(msg) await client_stream.send(msg)
except( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
to_remove.add(client_stream)
self.clients.remove(client_stream) self.clients.remove(client_stream)
log.warning( log.warning(
f'client for {client_stream} was already closed?') f'client for {client_stream} was already closed?')
if to_remove:
subs.difference_update(to_remove)
_router: Router = None _router: Router = None
async def open_brokerd_trades_dialogue( async def open_brokerd_trades_dialog(
router: Router, router: Router,
feed: Feed, feed: Feed,
@ -505,7 +543,7 @@ async def open_brokerd_trades_dialogue(
# we cache the relay task and instead of running multiple # we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being # tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client # relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues # stream to this single relay loop in the dialog table.
# begin processing order events from the target brokerd backend # begin processing order events from the target brokerd backend
# by receiving order submission response messages, # by receiving order submission response messages,
@ -532,13 +570,13 @@ async def open_brokerd_trades_dialogue(
).append(msg) ).append(msg)
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_stream=brokerd_trades_stream,
positions=pps, positions=pps,
accounts=accounts, accounts=accounts,
consumers=1, consumers=1,
) )
_router.relays[broker] = relay router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we # the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless # want to keep the ``brokerd`` dialogue up regardless
@ -550,9 +588,9 @@ async def open_brokerd_trades_dialogue(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
# parent context must have been closed # parent context must have been closed remove from cache so
# remove from cache so next client will respawn if needed # next client will respawn if needed
relay = _router.relays.pop(broker, None) relay = router.relays.pop(broker, None)
if not relay: if not relay:
log.warning(f'Relay for {broker} was already removed!?') log.warning(f'Relay for {broker} was already removed!?')
@ -607,8 +645,7 @@ async def translate_and_relay_brokerd_events(
''' '''
book: _DarkBook = router.get_dark_book(broker) book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_stream == brokerd_trades_stream
assert relay.brokerd_dialogue == brokerd_trades_stream
brokerd_msg: dict[str, Any] brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
@ -640,7 +677,7 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
await router.client_broadcast(pos_msg) await router.client_broadcast(sym, pos_msg)
continue continue
# BrokerdOrderAck # BrokerdOrderAck
@ -707,18 +744,18 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid]
status_msg.resp = 'error' status_msg.resp = 'error'
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
book._active[oid] = status_msg book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
await router.client_broadcast(sym, status_msg)
# BrokerdStatus # BrokerdStatus
case { case {
'name': 'status', 'name': 'status',
'status': status, 'status': status,
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
} if ( } if (
(oid := book._ems2brokerd_ids.inverse.get(reqid)) (oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in ( and status in (
@ -732,16 +769,16 @@ async def translate_and_relay_brokerd_events(
# TODO: maybe pack this into a composite type that # TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the # contains both the IPC stream as well the
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_stream = router.dialogues.get(oid) ems_client_order_streams = router.get_subs(oid)
status_msg = book._active.get(oid) status_msg = book._active.get(oid)
if ( if (
not ems_client_order_stream not ems_client_order_streams
or not status_msg or not status_msg
): ):
log.warning( log.warning(
'Received status for unknown dialog {oid}:\n' f'Received status for untracked dialog {oid}:\n'
'{fmsg}' f'{fmsg}'
) )
continue continue
@ -759,7 +796,11 @@ async def translate_and_relay_brokerd_events(
status_msg.reqid = reqid # THIS LINE IS CRITICAL! status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
@ -793,8 +834,6 @@ async def translate_and_relay_brokerd_events(
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{fmsg}') log.info(f'Fill for {oid} cleared with:\n{fmsg}')
ems_client_order_stream = router.dialogues[oid]
# XXX: bleh, a fill can come after 'closed' from `ib`? # XXX: bleh, a fill can come after 'closed' from `ib`?
# only send a late fill event we haven't already closed # only send a late fill event we haven't already closed
# out the dialog status locally. # out the dialog status locally.
@ -803,7 +842,11 @@ async def translate_and_relay_brokerd_events(
status_msg.resp = 'fill' status_msg.resp = 'fill'
status_msg.reqid = reqid status_msg.reqid = reqid
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
# ``Status`` containing an embedded order msg which # ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the # should be loaded as a "pre-existing open order" from the
@ -855,7 +898,10 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
await router.client_broadcast(status_msg) await router.client_broadcast(
order.symbol,
status_msg,
)
# don't fall through # don't fall through
continue continue
@ -884,7 +930,11 @@ async def translate_and_relay_brokerd_events(
oid = book._ems2brokerd_ids.inverse.get(reqid) oid = book._ems2brokerd_ids.inverse.get(reqid)
msg = f'Unhandled broker status for dialog {reqid}:\n' msg = f'Unhandled broker status for dialog {reqid}:\n'
if oid: if oid:
status_msg = book._active[oid] status_msg = book._active.get(oid)
# status msg may not have been set yet or popped?
# NOTE: have seen a key error here on kraken
# clearable limits..
if status_msg:
msg += ( msg += (
f'last status msg: {pformat(status_msg)}\n\n' f'last status msg: {pformat(status_msg)}\n\n'
f'this msg:{fmsg}\n' f'this msg:{fmsg}\n'
@ -899,26 +949,27 @@ async def translate_and_relay_brokerd_events(
# if status_msg is not None: # if status_msg is not None:
# del status_msg # del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# router.dialogues.pop(oid)
async def process_client_order_cmds( async def process_client_order_cmds(
client_order_stream: tractor.MsgStream, client_order_stream: tractor.MsgStream,
brokerd_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream,
symbol: str, fqsn: str,
feed: Feed, feed: Feed,
dark_book: _DarkBook, dark_book: _DarkBook,
router: Router, router: Router,
) -> None: ) -> None:
'''
Client-dialog request loop: accept order requests and deliver
initial status msg responses to subscribed clients.
client_dialogues = router.dialogues This task-loop handles both management of dark triggered orders and
alerts by inserting them into the "dark book"-table as well as
submitting live orders immediately if requested by the client.
'''
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(f'Received order cmd:\n{pformat(cmd)}')
@ -926,11 +977,17 @@ async def process_client_order_cmds(
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid = str(cmd['oid']) oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id # register this stream as an active order dialog (msg flow) for
# such that translated message from the brokerd backend can be # this order id such that translated message from the brokerd
# routed (relayed) to **just** that client stream (and in theory # backend can be routed and relayed to subscribed clients.
# others who are registered for such order affiliated msgs). subs = router.dialogs[oid]
client_dialogues[oid] = client_order_stream
# add all subscribed clients for this fqsn (should eventually be
# a more generalize subscription system) to received order msg
# updates (and thus show stuff in the UI).
subs.add(client_order_stream)
subs.update(router.subscribers[fqsn])
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
# any dark/live status which is current # any dark/live status which is current
@ -942,7 +999,7 @@ async def process_client_order_cmds(
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if ( } if (
(status := dark_book._active.get(oid)) status
and status.resp in ('open', 'pending') and status.resp in ('open', 'pending')
): ):
reqid = status.reqid reqid = status.reqid
@ -978,11 +1035,11 @@ async def process_client_order_cmds(
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if ( } if (
status and status.resp == 'dark_open' status
# or status and status.req and status.resp == 'dark_open'
): ):
# remove from dark book clearing # remove from dark book clearing
entry = dark_book.orders[symbol].pop(oid, None) entry = dark_book.orders[fqsn].pop(oid, None)
if entry: if entry:
( (
pred, pred,
@ -997,13 +1054,18 @@ async def process_client_order_cmds(
status.resp = 'canceled' status.resp = 'canceled'
status.req = cmd status.req = cmd
await client_order_stream.send(status) await router.client_broadcast(
# de-register this client dialogue fqsn,
router.dialogues.pop(oid) status,
)
# de-register this order dialogue from all clients
router.dialogs[oid].clear()
router.dialogs.pop(oid)
dark_book._active.pop(oid) dark_book._active.pop(oid)
else: else:
log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {fqsn}?')
# TODO: eventually we should be receiving # TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol # this struct on the wire unpacked in a scoped protocol
@ -1030,8 +1092,8 @@ async def process_client_order_cmds(
if status is not None: if status is not None:
# if we already had a broker order id then # if we already had a broker order id then
# this is likely an order update commmand. # this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid reqid = status.reqid
log.info(f"Modifying live {broker} order: {reqid}")
status.req = req status.req = req
status.resp = 'pending' status.resp = 'pending'
@ -1162,7 +1224,12 @@ async def process_client_order_cmds(
src='dark', src='dark',
) )
dark_book._active[oid] = status dark_book._active[oid] = status
await client_order_stream.send(status)
# broadcast status to all subscribed clients
await router.client_broadcast(
fqsn,
status,
)
@tractor.context @tractor.context
@ -1188,20 +1255,26 @@ async def _emsd_main(
received in a stream from that client actor and then responses are received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client. streamed back up to the original calling task in the same client.
The primary ``emsd`` task tree is: The primary ``emsd`` task trees are:
- ``_emsd_main()``: - ``_setup_persistent_emsd()``:
sets up brokerd feed, order feed with ems client, trades dialogue with is the ``emsd`` actor's primary root task which sets up an
brokderd trading api. actor-global ``Router`` instance and starts a relay loop task
| which lives until the backend broker is shutdown or the ems is
- ``clear_dark_triggers()``: terminated.
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
| |
- (maybe) ``translate_and_relay_brokerd_events()``: - (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker. reponse" proxy-broker.
- ``_emsd_main()``:
attaches a brokerd real-time quote feed and trades dialogue with
brokderd trading api for every connecting client.
|
- ``clear_dark_triggers()``:
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
| |
- ``process_client_order_cmds()``: - ``process_client_order_cmds()``:
accepts order cmds from requesting clients, registers dark orders and accepts order cmds from requesting clients, registers dark orders and
@ -1248,11 +1321,10 @@ async def _emsd_main(
loglevel, loglevel,
) as relay, ) as relay,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
brokerd_stream = relay.brokerd_dialogue # .clone() brokerd_stream = relay.brokerd_stream
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
@ -1264,26 +1336,24 @@ async def _emsd_main(
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream: async with ems_ctx.open_stream() as client_stream:
# register the client side before startingn the # register the client side before starting the
# brokerd-side relay task to ensure the client is # brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup. # delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream) _router.clients.add(client_stream)
n.start_soon( # TODO: instead of by fqsn we need a subscription
translate_and_relay_brokerd_events, # system/schema here to limit what each new client is
broker, # allowed to see in terms of broadcasted order flow
brokerd_stream, # updates per dialog.
_router, _router.subscribers[fqsn].add(client_stream)
)
# trigger scan and exec loop # trigger scan and exec loop
n.start_soon( n.start_soon(
clear_dark_triggers, clear_dark_triggers,
_router,
brokerd_stream, brokerd_stream,
ems_client_order_stream,
quote_stream, quote_stream,
broker, broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker> fqsn, # form: <name>.<venue>.<suffix>.<broker>
@ -1291,16 +1361,11 @@ async def _emsd_main(
) )
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
try:
# main entrypoint, run here until cancelled. # main entrypoint, run here until cancelled.
try:
await process_client_order_cmds( await process_client_order_cmds(
client_stream,
ems_client_order_stream,
# relay.brokerd_dialogue,
brokerd_stream, brokerd_stream,
fqsn, fqsn,
feed, feed,
dark_book, dark_book,
@ -1310,28 +1375,27 @@ async def _emsd_main(
finally: finally:
# try to remove client from "registry" # try to remove client from "registry"
try: try:
_router.clients.remove(ems_client_order_stream) _router.clients.remove(client_stream)
except KeyError: except KeyError:
log.warning( log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}' f'Stream {client_stream._ctx.chan.uid}'
' was already dropped?' ' was already dropped?'
) )
dialogues = _router.dialogues _router.subscribers[fqsn].remove(client_stream)
dialogs = _router.dialogs
for oid, client_streams in dialogs.items():
if client_stream in client_streams:
client_streams.remove(client_stream)
for oid, client_stream in dialogues.copy().items(): # TODO: for order dialogs left "alive" in
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some # the ems this is where we should allow some
# system to take over management. Likely we # system to take over management. Likely we
# want to allow the user to choose what kind # want to allow the user to choose what kind
# of policy to use (eg. cancel all orders # of policy to use (eg. cancel all orders
# from client, run some algo, etc.) # from client, run some algo, etc.)
if not client_streams:
log.warning(
f'Order dialog is being unmonitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)

View File

@ -261,10 +261,10 @@ class BrokerdFill(Struct):
time_ns: int time_ns: int
# order exeuction related # order exeuction related
action: str
size: float size: float
price: float price: float
action: Optional[str] = None
broker_details: dict = {} # meta-data (eg. commisions etc.) broker_details: dict = {} # meta-data (eg. commisions etc.)
# brokerd timestamp required for order mode arrow placement on x-axis # brokerd timestamp required for order mode arrow placement on x-axis

View File

@ -61,13 +61,13 @@ log = get_logger(__name__)
class PaperBoi(Struct): class PaperBoi(Struct):
""" '''
Emulates a broker order client providing the same API and Emulates a broker order client providing approximately the same API
delivering an order-event response stream but with methods for and delivering an order-event response stream but with methods for
triggering desired events based on forward testing engine triggering desired events based on forward testing engine
requirements. requirements (eg open, closed, fill msgs).
""" '''
broker: str broker: str
ems_trades_stream: tractor.MsgStream ems_trades_stream: tractor.MsgStream
@ -207,9 +207,10 @@ class PaperBoi(Struct):
remaining: float = 0, remaining: float = 0,
) -> None: ) -> None:
"""Pretend to fill a broker order @ price and size. '''
Pretend to fill a broker order @ price and size.
""" '''
# TODO: net latency model # TODO: net latency model
await trio.sleep(0.05) await trio.sleep(0.05)
fill_time_ns = time.time_ns() fill_time_ns = time.time_ns()
@ -230,6 +231,7 @@ class PaperBoi(Struct):
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
log.info(f'Fake filling order:\n{fill_msg}')
await self.ems_trades_stream.send(fill_msg) await self.ems_trades_stream.send(fill_msg)
self._trade_ledger.update(fill_msg.to_dict()) self._trade_ledger.update(fill_msg.to_dict())
@ -336,9 +338,10 @@ async def simulate_fills(
return tick_price >= our_price return tick_price >= our_price
match tick: match tick:
# on an ask queue tick, only clear buy entries
case { case {
'price': tick_price, 'price': tick_price,
# 'type': ('ask' | 'trade' | 'last'),
'type': 'ask', 'type': 'ask',
}: }:
client.last_ask = ( client.last_ask = (
@ -351,9 +354,9 @@ async def simulate_fills(
itertools.repeat(buy_on_ask) itertools.repeat(buy_on_ask)
) )
# on a bid queue tick, only clear sell entries
case { case {
'price': tick_price, 'price': tick_price,
# 'type': ('bid' | 'trade' | 'last'),
'type': 'bid', 'type': 'bid',
}: }:
client.last_bid = ( client.last_bid = (
@ -366,6 +369,10 @@ async def simulate_fills(
itertools.repeat(sell_on_bid) itertools.repeat(sell_on_bid)
) )
# TODO: fix this block, though it definitely
# costs a lot more CPU-wise
# - doesn't seem like clears are happening still on
# "resting" limit orders?
case { case {
'price': tick_price, 'price': tick_price,
'type': ('trade' | 'last'), 'type': ('trade' | 'last'),
@ -390,11 +397,19 @@ async def simulate_fills(
iter_entries = interleave() iter_entries = interleave()
# NOTE: all other (non-clearable) tick event types
# - we don't want to sping the simulated clear loop
# below unecessarily and further don't want to pop
# simulated live orders prematurely.
case _:
continue
# iterate all potentially clearable book prices # iterate all potentially clearable book prices
# in FIFO order per side. # in FIFO order per side.
for order_info, pred in iter_entries: for order_info, pred in iter_entries:
(our_price, size, reqid, action) = order_info (our_price, size, reqid, action) = order_info
# print(order_info)
clearable = pred(our_price) clearable = pred(our_price)
if clearable: if clearable:
# pop and retreive order info # pop and retreive order info
@ -469,8 +484,8 @@ async def handle_order_requests(
# counter - collision prone..) # counter - collision prone..)
reqid=reqid, reqid=reqid,
) )
log.info(f'Submitted paper LIMIT {reqid}:\n{order}')
# elif action == 'cancel':
case {'action': 'cancel'}: case {'action': 'cancel'}:
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
await client.submit_cancel( await client.submit_cancel(
@ -532,7 +547,10 @@ async def trades_dialogue(
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started((pp_msgs, ['paper'])) await ctx.started((
pp_msgs,
['paper'],
))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,

View File

@ -152,11 +152,11 @@ class OrderMode:
A callback applied for each level change to the line A callback applied for each level change to the line
which will recompute the order size based on allocator which will recompute the order size based on allocator
settings. this is assigned inside settings. this is assigned inside
``OrderMode.line_from_order()`` ``OrderMode.new_line_from_order()``
''' '''
# NOTE: the ``Order.account`` is set at order stage time inside # NOTE: the ``Order.account`` is set at order stage time inside
# ``OrderMode.line_from_order()`` or is inside ``Order`` msg # ``OrderMode.new_line_from_order()`` or is inside ``Order`` msg
# field for loaded orders. # field for loaded orders.
order_info = tracker.alloc.next_order_info( order_info = tracker.alloc.next_order_info(
startup_pp=tracker.startup_pp, startup_pp=tracker.startup_pp,
@ -174,7 +174,7 @@ class OrderMode:
# reflect the corresponding account and pos info. # reflect the corresponding account and pos info.
self.pane.on_ui_settings_change('account', order.account) self.pane.on_ui_settings_change('account', order.account)
def line_from_order( def new_line_from_order(
self, self,
order: Order, order: Order,
chart: Optional[ChartPlotWidget] = None, chart: Optional[ChartPlotWidget] = None,
@ -240,7 +240,7 @@ class OrderMode:
(self.hist_chart, {'only_show_markers_on_hover': True}), (self.hist_chart, {'only_show_markers_on_hover': True}),
]: ]:
kwargs.update(line_kwargs) kwargs.update(line_kwargs)
line = self.line_from_order( line = self.new_line_from_order(
order=order, order=order,
chart=chart, chart=chart,
**kwargs, **kwargs,
@ -300,7 +300,7 @@ class OrderMode:
# `LineEditor.unstage_line()` on all staged lines.. # `LineEditor.unstage_line()` on all staged lines..
# lines = self.lines_from_order( # lines = self.lines_from_order(
line = self.line_from_order( line = self.new_line_from_order(
order, order,
chart=chart, chart=chart,
show_markers=True, show_markers=True,
@ -428,10 +428,11 @@ class OrderMode:
) -> None: ) -> None:
level = line.value() level = line.value()
# updated by level change callback set in ``.line_from_order()`` # updated by level change callback set in ``.new_line_from_order()``
dialog = line.dialog dialog = line.dialog
size = dialog.order.size size = dialog.order.size
# NOTE: sends modified order msg to EMS
self.book.update( self.book.update(
uuid=line.dialog.uuid, uuid=line.dialog.uuid,
price=level, price=level,
@ -447,7 +448,8 @@ class OrderMode:
# EMS response msg handlers # EMS response msg handlers
def on_submit( def on_submit(
self, self,
uuid: str uuid: str,
order: Optional[Order] = None,
) -> Dialog: ) -> Dialog:
''' '''
@ -464,6 +466,20 @@ class OrderMode:
dialog.last_status_close() dialog.last_status_close()
for line in lines: for line in lines:
# if an order msg is provided update the line
# **from** that msg.
if order:
line.set_level(order.price)
self.on_level_change_update_next_order_info(
level=order.price,
line=line,
order=order,
# use the corresponding position tracker for the
# order's account.
tracker=self.trackers[order.account],
)
# hide any lines not currently moused-over # hide any lines not currently moused-over
if not line.get_cursor(): if not line.get_cursor():
line.hide_labels() line.hide_labels()
@ -980,9 +996,11 @@ async def process_trade_msg(
match msg: match msg:
case Status(resp='dark_open' | 'open'): case Status(resp='dark_open' | 'open'):
order = Order(**msg.req)
if dialog is not None: if dialog is not None:
# show line label once order is live # show line label once order is live
mode.on_submit(oid) mode.on_submit(oid, order=order)
else: else:
log.warning( log.warning(
@ -992,7 +1010,6 @@ async def process_trade_msg(
sym = mode.chart.linked.symbol sym = mode.chart.linked.symbol
fqsn = sym.front_fqsn() fqsn = sym.front_fqsn()
order = Order(**msg.req)
if ( if (
((order.symbol + f'.{msg.src}') == fqsn) ((order.symbol + f'.{msg.src}') == fqsn)
@ -1009,7 +1026,6 @@ async def process_trade_msg(
msg.req = order msg.req = order
dialog = mode.load_unknown_dialog_from_msg(msg) dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid) mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'): case Status(resp='error'):
# delete level line from view # delete level line from view