Update ledger from api immediately, cruft cleaning

kraken_ws_orders
Tyler Goodlet 2022-07-19 11:02:09 -04:00
parent aa7f24b6db
commit b1419c850d
1 changed files with 23 additions and 39 deletions
piker/brokers/kraken

View File

@ -31,7 +31,6 @@ import time
from typing import (
Any,
AsyncIterator,
# Optional,
Union,
)
@ -46,7 +45,6 @@ from piker.pp import (
Position,
PpTable,
Transaction,
# update_pps_conf,
open_trade_ledger,
open_pps,
)
@ -391,6 +389,7 @@ async def trades_dialogue(
# most recent 50 trades and assume that by ordering we
# already have those records in the ledger.
tids2trades = await client.get_trades()
ledger_dict.update(tids2trades)
api_trans = norm_trade_records(tids2trades)
# retrieve kraken reported balances
@ -448,12 +447,13 @@ async def trades_dialogue(
ppmsgs = trades2pps(
table,
acctid,
# new_trans,
)
await ctx.started((ppmsgs, [acc_name]))
# XXX: not fucking clue but putting this finally block
# will suppress errors inside the direct await below!?!
# likely something to do with the exist stack inside
# the nobsws stuff...
# try:
# Get websocket token for authenticated data stream
@ -494,26 +494,19 @@ async def trades_dialogue(
)
# enter relay loop
# try:
try:
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
table,
api_trans,
acctid,
acc_name,
token,
)
# except:
# await tractor.breakpoint()
finally:
# always update ledger on exit
ledger_dict.update(tids2trades)
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
table,
api_trans,
acctid,
acc_name,
token,
)
async def handle_order_updates(
@ -561,9 +554,13 @@ async def handle_order_updates(
f'ownTrades update_{seq}:\n'
f'{pformat(trades_msgs)}'
)
# XXX: a fix / todo
# see the comment in the caller about weird error
# suppression around a commented `try:`
# assert 0
# format as tid -> trade event map
# eg. msg
# eg. received msg format,
# [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047',
# 'fee': '0.24776',
# 'margin': '0.00000',
@ -579,15 +576,10 @@ async def handle_order_updates(
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# don't re-process datums we've already seen
if tid not in ledger_trans
}
# if tid in ledger_trans:
# # skip already seen transactions
# log.info(f'Skipping already seen trade {trade}')
# continue
# await tractor.breakpoint()
for tid, trade in trades.items():
txid = trade['ordertxid']
@ -642,11 +634,6 @@ async def handle_order_updates(
)
await ems_stream.send(filled_msg)
# if not trades:
# # skip pp emissions if we have already
# # processed all trades in this msg.
# continue
new_trans = norm_trade_records(trades)
ppmsgs = trades2pps(
table,
@ -897,9 +884,6 @@ async def handle_order_updates(
chain = apiflows[reqid]
chain.maps.append(event)
# pretxid = chain['txid']
# print(f'pretxid: {pretxid}')
resps, errored = process_status(
event,
oid,