diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 43dab81c..6c56a3e7 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,7 +31,6 @@ import time from typing import ( Any, AsyncIterator, - # Optional, Union, ) @@ -46,7 +45,6 @@ from piker.pp import ( Position, PpTable, Transaction, - # update_pps_conf, open_trade_ledger, open_pps, ) @@ -391,6 +389,7 @@ async def trades_dialogue( # most recent 50 trades and assume that by ordering we # already have those records in the ledger. tids2trades = await client.get_trades() + ledger_dict.update(tids2trades) api_trans = norm_trade_records(tids2trades) # retrieve kraken reported balances @@ -448,12 +447,13 @@ async def trades_dialogue( ppmsgs = trades2pps( table, acctid, - # new_trans, ) await ctx.started((ppmsgs, [acc_name])) # XXX: not fucking clue but putting this finally block # will suppress errors inside the direct await below!?! + # likely something to do with the exist stack inside + # the nobsws stuff... # try: # Get websocket token for authenticated data stream @@ -494,26 +494,19 @@ async def trades_dialogue( ) # enter relay loop - # try: - try: - await handle_order_updates( - ws, - stream, - ems_stream, - apiflows, - ids, - reqids2txids, - table, - api_trans, - acctid, - acc_name, - token, - ) - # except: - # await tractor.breakpoint() - finally: - # always update ledger on exit - ledger_dict.update(tids2trades) + await handle_order_updates( + ws, + stream, + ems_stream, + apiflows, + ids, + reqids2txids, + table, + api_trans, + acctid, + acc_name, + token, + ) async def handle_order_updates( @@ -561,9 +554,13 @@ async def handle_order_updates( f'ownTrades update_{seq}:\n' f'{pformat(trades_msgs)}' ) + # XXX: a fix / todo + # see the comment in the caller about weird error + # suppression around a commented `try:` # assert 0 + # format as tid -> trade event map - # eg. msg + # eg. received msg format, # [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047', # 'fee': '0.24776', # 'margin': '0.00000', @@ -579,15 +576,10 @@ async def handle_order_updates( tid: trade for entry in trades_msgs for (tid, trade) in entry.items() + + # don't re-process datums we've already seen if tid not in ledger_trans } - - # if tid in ledger_trans: - # # skip already seen transactions - # log.info(f'Skipping already seen trade {trade}') - # continue - - # await tractor.breakpoint() for tid, trade in trades.items(): txid = trade['ordertxid'] @@ -642,11 +634,6 @@ async def handle_order_updates( ) await ems_stream.send(filled_msg) - # if not trades: - # # skip pp emissions if we have already - # # processed all trades in this msg. - # continue - new_trans = norm_trade_records(trades) ppmsgs = trades2pps( table, @@ -897,9 +884,6 @@ async def handle_order_updates( chain = apiflows[reqid] chain.maps.append(event) - # pretxid = chain['txid'] - # print(f'pretxid: {pretxid}') - resps, errored = process_status( event, oid,