Error log brokerd msgs that have `.reqid == None`
Relates to the bug discovered in #310, this should avoid out-of-order msgs which do not have a `.reqid` set to be error logged to console. Further, add `pformat()` to kraken logging of ems msging.incremental_update_paths
parent
4f36743f64
commit
47cf4aa4f7
|
@ -21,6 +21,7 @@ Kraken backend.
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from dataclasses import asdict, field
|
from dataclasses import asdict, field
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pprint import pformat
|
||||||
from typing import Any, Optional, AsyncIterator, Callable, Union
|
from typing import Any, Optional, AsyncIterator, Callable, Union
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -569,7 +570,10 @@ async def handle_order_requests(
|
||||||
order: BrokerdOrder
|
order: BrokerdOrder
|
||||||
|
|
||||||
async for request_msg in ems_order_stream:
|
async for request_msg in ems_order_stream:
|
||||||
log.info(f'Received order request {request_msg}')
|
log.info(
|
||||||
|
'Received order request:\n'
|
||||||
|
f'{pformat(request_msg)}'
|
||||||
|
)
|
||||||
|
|
||||||
action = request_msg['action']
|
action = request_msg['action']
|
||||||
|
|
||||||
|
@ -628,6 +632,7 @@ async def handle_order_requests(
|
||||||
# update the internal pairing of oid to krakens
|
# update the internal pairing of oid to krakens
|
||||||
# txid with the new txid that is returned on edit
|
# txid with the new txid that is returned on edit
|
||||||
reqid = resp['result']['txid']
|
reqid = resp['result']['txid']
|
||||||
|
|
||||||
# deliver ack that order has been submitted to broker routing
|
# deliver ack that order has been submitted to broker routing
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdOrderAck(
|
BrokerdOrderAck(
|
||||||
|
@ -788,7 +793,10 @@ async def trades_dialogue(
|
||||||
# Get websocket token for authenticated data stream
|
# Get websocket token for authenticated data stream
|
||||||
# Assert that a token was actually received.
|
# Assert that a token was actually received.
|
||||||
resp = await client.endpoint('GetWebSocketsToken', {})
|
resp = await client.endpoint('GetWebSocketsToken', {})
|
||||||
|
|
||||||
|
# lol wtf is this..
|
||||||
assert resp['error'] == []
|
assert resp['error'] == []
|
||||||
|
|
||||||
token = resp['result']['token']
|
token = resp['result']['token']
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
|
|
|
@ -561,7 +561,10 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
name = brokerd_msg['name']
|
name = brokerd_msg['name']
|
||||||
|
|
||||||
log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}')
|
log.info(
|
||||||
|
f'Received broker trade event:\n'
|
||||||
|
f'{pformat(brokerd_msg)}'
|
||||||
|
)
|
||||||
|
|
||||||
if name == 'position':
|
if name == 'position':
|
||||||
|
|
||||||
|
@ -613,19 +616,28 @@ async def translate_and_relay_brokerd_events(
|
||||||
# packed at submission since we already know it ahead of
|
# packed at submission since we already know it ahead of
|
||||||
# time
|
# time
|
||||||
paper = brokerd_msg['broker_details'].get('paper_info')
|
paper = brokerd_msg['broker_details'].get('paper_info')
|
||||||
|
ext = brokerd_msg['broker_details'].get('external')
|
||||||
if paper:
|
if paper:
|
||||||
# paperboi keeps the ems id up front
|
# paperboi keeps the ems id up front
|
||||||
oid = paper['oid']
|
oid = paper['oid']
|
||||||
|
|
||||||
else:
|
elif ext:
|
||||||
# may be an order msg specified as "external" to the
|
# may be an order msg specified as "external" to the
|
||||||
# piker ems flow (i.e. generated by some other
|
# piker ems flow (i.e. generated by some other
|
||||||
# external broker backend client (like tws for ib)
|
# external broker backend client (like tws for ib)
|
||||||
ext = brokerd_msg['broker_details'].get('external')
|
log.error(f"External trade event {ext}")
|
||||||
if ext:
|
|
||||||
log.error(f"External trade event {ext}")
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
else:
|
||||||
|
# something is out of order, we don't have an oid for
|
||||||
|
# this broker-side message.
|
||||||
|
log.error(
|
||||||
|
'Unknown oid:{oid} for msg:\n'
|
||||||
|
f'{pformat(brokerd_msg)}'
|
||||||
|
'Unable to relay message to client side!?'
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# check for existing live flow entry
|
# check for existing live flow entry
|
||||||
entry = book._ems_entries.get(oid)
|
entry = book._ems_entries.get(oid)
|
||||||
|
@ -823,7 +835,9 @@ async def process_client_order_cmds(
|
||||||
if reqid:
|
if reqid:
|
||||||
|
|
||||||
# send cancel to brokerd immediately!
|
# send cancel to brokerd immediately!
|
||||||
log.info("Submitting cancel for live order {reqid}")
|
log.info(
|
||||||
|
f'Submitting cancel for live order {reqid}'
|
||||||
|
)
|
||||||
|
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg.dict())
|
||||||
|
|
||||||
|
|
|
@ -873,7 +873,9 @@ async def process_trades_and_update_ui(
|
||||||
mode.lines.remove_line(uuid=oid)
|
mode.lines.remove_line(uuid=oid)
|
||||||
|
|
||||||
# each clearing tick is responded individually
|
# each clearing tick is responded individually
|
||||||
elif resp in ('broker_filled',):
|
elif resp in (
|
||||||
|
'broker_filled',
|
||||||
|
):
|
||||||
|
|
||||||
known_order = book._sent_orders.get(oid)
|
known_order = book._sent_orders.get(oid)
|
||||||
if not known_order:
|
if not known_order:
|
||||||
|
|
Loading…
Reference in New Issue