Flatten the brokerd-dialog relay loop using `match:`

the_ems_flattening
Tyler Goodlet 2022-08-04 15:21:15 -04:00
parent 46c51b55f7
commit 2309e7ab05
1 changed files with 156 additions and 148 deletions

View File

@ -23,7 +23,11 @@ from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
from typing import (
AsyncIterator,
Any,
Callable,
)
from bidict import bidict
import trio
@ -572,98 +576,57 @@ async def translate_and_relay_brokerd_events(
assert relay.brokerd_dialogue == brokerd_trades_stream
brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
log.info(
f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}'
)
match brokerd_msg:
if name == 'position':
# BrokerdPosition
case {
'name': 'position',
'symbol': sym,
'broker': broker,
}:
pos_msg = BrokerdPosition(**brokerd_msg)
pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
# sym, broker = pos_msg.symbol, pos_msg.broker
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault(
# NOTE: translate to a FQSN!
(broker, sym),
[]
).append(pos_msg)
relay.positions.setdefault(
# NOTE: translate to a FQSN!
(broker, sym),
[]
).append(pos_msg)
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
continue
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = brokerd_msg['reqid']
# all piker originated requests will have an ems generated oid field
oid = brokerd_msg.get(
'oid',
book._ems2brokerd_ids.inverse.get(reqid)
)
if oid is None:
# XXX: paper clearing special cases
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = brokerd_msg['broker_details'].get('paper_info')
ext = brokerd_msg['broker_details'].get('external')
if paper:
# paperboi keeps the ems id up front
oid = paper['oid']
elif ext:
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
log.error(f"External trade event {ext}")
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
continue
else:
# something is out of order, we don't have an oid for
# this broker-side message.
log.error(
f'Unknown oid: {oid} for msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request
if name == 'ack':
# BrokerdOrderAck
case {
'name': 'ack',
'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id
} if (
entry := book._ems_entries.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
# register the brokerd request id (that was generated
# / created internally by the broker backend) with our
@ -697,91 +660,136 @@ async def translate_and_relay_brokerd_events(
# update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
# no msg to client necessary
continue
# a live flow now exists
oid = entry.oid
# BrokerdOrderError
case {
'name': 'error',
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
'broker_details': details,
# 'reason': reason,
}:
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
# TODO: instead this should be our status set.
# ack, open, fill, closed, cancelled'
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
resp = None
broker_details = {}
if name in (
'error',
):
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
msg = BrokerdStatus(**brokerd_msg)
msg = BrokerdError(**brokerd_msg)
# TODO: should we flatten out these cases and/or should
# they maybe even eventually be separate messages?
if status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
# XXX should we make one when it's blank?
log.error(pformat(msg))
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not remaining:
# TODO: getting this bs, prolly need to handle status messages
# 'Market data farm connection is OK:usfarm.nj'
resp = 'broker_executed'
# another stupid ib error to handle
# if 10147 in message: cancel
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
resp = 'broker_errored'
broker_details = msg
# just log it
else:
log.info(f'{broker} filled {msg}')
# don't relay message to order requester client
# continue
elif name in (
'status',
):
msg = BrokerdStatus(**brokerd_msg)
if msg.status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg.remaining:
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
else:
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# pass the BrokerdStatus msg inside the broker details field
broker_details = msg
# unknown valid message case?
case {
'name': name,
'symbol': sym,
'reqid': reqid, # brokerd generated order-request id
# 'oid': oid, # ems order-dialog id
'broker_details': details,
elif name in (
'fill',
):
msg = BrokerdFill(**brokerd_msg)
} if (
book._ems2brokerd_ids.inverse.get(reqid) is None
):
# TODO: pretty sure we can drop this now?
# XXX: paper clearing special cases
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = details.get('paper_info')
ext = details.get('external')
# proxy through the "fill" result(s)
resp = 'broker_filled'
broker_details = msg
if paper:
# paperboi keeps the ems id up front
oid = paper['oid']
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
elif ext:
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
log.error(f"External trade event {name}@{ext}")
else:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
else:
# something is out of order, we don't have an oid for
# this broker-side message.
log.error(
f'Unknown oid: {oid} for msg {name}:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
continue
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
entry = book._ems_entries[oid]
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message
# to requesting EMS client
@ -793,7 +801,7 @@ async def translate_and_relay_brokerd_events(
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
brokerd_msg=msg,
)
)
except KeyError: