diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 30e57b9e..670eed6f 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -21,6 +21,7 @@ Kraken backend. from contextlib import asynccontextmanager as acm from dataclasses import asdict, field from datetime import datetime +from pprint import pformat from typing import Any, Optional, AsyncIterator, Callable, Union import time @@ -569,7 +570,10 @@ async def handle_order_requests( order: BrokerdOrder async for request_msg in ems_order_stream: - log.info(f'Received order request {request_msg}') + log.info( + 'Received order request:\n' + f'{pformat(request_msg)}' + ) action = request_msg['action'] @@ -628,6 +632,7 @@ async def handle_order_requests( # update the internal pairing of oid to krakens # txid with the new txid that is returned on edit reqid = resp['result']['txid'] + # deliver ack that order has been submitted to broker routing await ems_order_stream.send( BrokerdOrderAck( @@ -788,7 +793,10 @@ async def trades_dialogue( # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) + + # lol wtf is this.. assert resp['error'] == [] + token = resp['result']['token'] async with ( diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index a5d04f0c..e00676f2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -561,7 +561,10 @@ async def translate_and_relay_brokerd_events( name = brokerd_msg['name'] - log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') + log.info( + f'Received broker trade event:\n' + f'{pformat(brokerd_msg)}' + ) if name == 'position': @@ -613,19 +616,28 @@ async def translate_and_relay_brokerd_events( # packed at submission since we already know it ahead of # time paper = brokerd_msg['broker_details'].get('paper_info') + ext = brokerd_msg['broker_details'].get('external') if paper: # paperboi keeps the ems id up front oid = paper['oid'] - else: + elif ext: # may be an order msg specified as "external" to the # piker ems flow (i.e. generated by some other # external broker backend client (like tws for ib) - ext = brokerd_msg['broker_details'].get('external') - if ext: - log.error(f"External trade event {ext}") + log.error(f"External trade event {ext}") continue + + else: + # something is out of order, we don't have an oid for + # this broker-side message. + log.error( + 'Unknown oid:{oid} for msg:\n' + f'{pformat(brokerd_msg)}' + 'Unable to relay message to client side!?' + ) + else: # check for existing live flow entry entry = book._ems_entries.get(oid) @@ -823,7 +835,9 @@ async def process_client_order_cmds( if reqid: # send cancel to brokerd immediately! - log.info("Submitting cancel for live order {reqid}") + log.info( + f'Submitting cancel for live order {reqid}' + ) await brokerd_order_stream.send(msg.dict()) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 3e230b71..a86fe816 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -873,7 +873,9 @@ async def process_trades_and_update_ui( mode.lines.remove_line(uuid=oid) # each clearing tick is responded individually - elif resp in ('broker_filled',): + elif resp in ( + 'broker_filled', + ): known_order = book._sent_orders.get(oid) if not known_order: