diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index f98d3314..7d2c3309 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -54,7 +54,6 @@ from .api import ( Client, BrokerError, get_client, - normalize_symbol, ) from .feed import ( get_console_log, @@ -273,7 +272,10 @@ async def trades_dialogue( log.info( f'Loaded {len(trades)} trades from account `{acc_name}`' ) - with open_ledger(acctid, trades) as trans: + with open_ledger( + acctid, + trades, + ) as trans: active, closed = pp.update_pps_conf( 'kraken', acctid, @@ -366,7 +368,7 @@ async def handle_order_updates( emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], reqids2txids: bidict[int, str], - trans: list[pp.Transaction], + trans: set[pp.Transaction], acctid: str, acc_name: str, token: str, @@ -381,7 +383,7 @@ async def handle_order_updates( ''' # transaction records which will be updated # on new trade clearing events (aka order "fills") - trans: list[pp.Transaction] + trans: set[pp.Transaction] async for msg in ws_stream: match msg: @@ -467,16 +469,27 @@ async def handle_order_updates( ) await ems_stream.send(filled_msg) + if not trades: + # skip pp emissions if we have already + # processed all trades in this msg. + continue + # update ledger and position tracking - with open_ledger(acctid, trades) as trans: - # TODO: ideally we can pass in an existingn + await tractor.breakpoint() + trans: set[pp.Transaction] + with open_ledger( + acctid, + trades, + + ) as trans: + # TODO: ideally we can pass in an existing # pps state to this right? such that we # don't have to do a ledger reload all the # time.. active, closed = pp.update_pps_conf( 'kraken', acctid, - trade_records=trans, + trade_records=list(trans), ledger_reload={}.fromkeys( t.bsuid for t in trans), ) @@ -841,8 +854,9 @@ def norm_trade_records( 'buy': 1, 'sell': -1, }[record['type']] - bsuid = record['pair'] - norm_sym = normalize_symbol(bsuid) + + # we normalize to kraken's `altname` always.. + bsuid = norm_sym = Client.normalize_symbol(record['pair']) records.append( pp.Transaction( @@ -867,7 +881,7 @@ def open_ledger( acctid: str, trade_entries: list[dict[str, Any]], -) -> list[pp.Transaction]: +) -> set[pp.Transaction]: ''' Write recent session's trades to the user's (local) ledger file. @@ -878,8 +892,10 @@ def open_ledger( ) as ledger: # normalize to transaction form + # TODO: cawt damn, we should probably delegate to cryptofeed for + # this insteada of re-hacking kraken's total crap? records = norm_trade_records(trade_entries) - yield records + yield set(records) # update on exit ledger.update(trade_entries)