Error log brokerd msgs that have `.reqid == None`

Relates to the bug discovered in #310, this should avoid out-of-order
msgs which do not have a `.reqid` set to be error logged to console.
Further, add `pformat()` to kraken logging of ems msging.
pre_flow
Tyler Goodlet 2022-05-10 09:22:46 -04:00
parent af61eac389
commit e1e42e4208
3 changed files with 32 additions and 8 deletions

View File

@ -21,6 +21,7 @@ Kraken backend.
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from datetime import datetime from datetime import datetime
from pprint import pformat
from typing import Any, Optional, AsyncIterator, Callable, Union from typing import Any, Optional, AsyncIterator, Callable, Union
import time import time
@ -569,7 +570,10 @@ async def handle_order_requests(
order: BrokerdOrder order: BrokerdOrder
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}') log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)
action = request_msg['action'] action = request_msg['action']
@ -628,6 +632,7 @@ async def handle_order_requests(
# update the internal pairing of oid to krakens # update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit # txid with the new txid that is returned on edit
reqid = resp['result']['txid'] reqid = resp['result']['txid']
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
BrokerdOrderAck( BrokerdOrderAck(
@ -788,7 +793,10 @@ async def trades_dialogue(
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received. # Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {}) resp = await client.endpoint('GetWebSocketsToken', {})
# lol wtf is this..
assert resp['error'] == [] assert resp['error'] == []
token = resp['result']['token'] token = resp['result']['token']
async with ( async with (

View File

@ -561,7 +561,10 @@ async def translate_and_relay_brokerd_events(
name = brokerd_msg['name'] name = brokerd_msg['name']
log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') log.info(
f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}'
)
if name == 'position': if name == 'position':
@ -613,19 +616,28 @@ async def translate_and_relay_brokerd_events(
# packed at submission since we already know it ahead of # packed at submission since we already know it ahead of
# time # time
paper = brokerd_msg['broker_details'].get('paper_info') paper = brokerd_msg['broker_details'].get('paper_info')
ext = brokerd_msg['broker_details'].get('external')
if paper: if paper:
# paperboi keeps the ems id up front # paperboi keeps the ems id up front
oid = paper['oid'] oid = paper['oid']
else: elif ext:
# may be an order msg specified as "external" to the # may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other # piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib) # external broker backend client (like tws for ib)
ext = brokerd_msg['broker_details'].get('external')
if ext:
log.error(f"External trade event {ext}") log.error(f"External trade event {ext}")
continue continue
else:
# something is out of order, we don't have an oid for
# this broker-side message.
log.error(
'Unknown oid:{oid} for msg:\n'
f'{pformat(brokerd_msg)}'
'Unable to relay message to client side!?'
)
else: else:
# check for existing live flow entry # check for existing live flow entry
entry = book._ems_entries.get(oid) entry = book._ems_entries.get(oid)
@ -823,7 +835,9 @@ async def process_client_order_cmds(
if reqid: if reqid:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info("Submitting cancel for live order {reqid}") log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg.dict())

View File

@ -907,7 +907,9 @@ async def process_trades_and_update_ui(
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually # each clearing tick is responded individually
elif resp in ('broker_filled',): elif resp in (
'broker_filled',
):
known_order = book._sent_orders.get(oid) known_order = book._sent_orders.get(oid)
if not known_order: if not known_order: