`kraken`: write `pps.toml` on updates for now

pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-14 20:55:37 -04:00
parent 4a2696f0ab
commit 786372618c
1 changed files with 30 additions and 4 deletions

View File

@ -50,6 +50,7 @@ from piker.accounting import (
) )
from piker.accounting._mktinfo import ( from piker.accounting._mktinfo import (
Symbol, Symbol,
MktPair,
digits_to_dec, digits_to_dec,
) )
from piker.clearing._messages import ( from piker.clearing._messages import (
@ -370,6 +371,8 @@ def trades2pps(
acctid: str, acctid: str,
new_trans: dict[str, Transaction] = {}, new_trans: dict[str, Transaction] = {},
write_storage: bool = True,
) -> tuple[ ) -> tuple[
list[BrokerdPosition], list[BrokerdPosition],
list[Transaction], list[Transaction],
@ -400,13 +403,20 @@ def trades2pps(
# right since `.broker` is already # right since `.broker` is already
# included? # included?
account='kraken.' + acctid, account='kraken.' + acctid,
symbol=p.symbol.front_fqsn(), symbol=p.symbol.fqme,
size=p.size, size=p.size,
avg_price=p.ppu, avg_price=p.ppu,
currency='', currency='',
) )
position_msgs.append(msg) position_msgs.append(msg)
if write_storage:
# TODO: ideally this blocks the this task
# as little as possible. we need to either do
# these writes in another actor, or try out `trio`'s
# async file IO api?
table.write_config()
return position_msgs return position_msgs
@ -639,6 +649,12 @@ async def trades_dialogue(
) )
await ctx.started((ppmsgs, [acc_name])) await ctx.started((ppmsgs, [acc_name]))
# TODO: ideally this blocks the this task
# as little as possible. we need to either do
# these writes in another actor, or try out `trio`'s
# async file IO api?
table.write_config()
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received. # Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {}) resp = await client.endpoint('GetWebSocketsToken', {})
@ -814,8 +830,6 @@ async def handle_order_updates(
for pp_msg in ppmsgs: for pp_msg in ppmsgs:
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg)
ledger_trans.update(new_trans)
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
case [ case [
@ -1184,9 +1198,21 @@ def norm_trade_records(
}[record['type']] }[record['type']]
# we normalize to kraken's `altname` always.. # we normalize to kraken's `altname` always..
bsuid, pair_info = Client.normalize_symbol(record['pair']) bsuid, pair_info = Client.normalize_symbol(
record['pair']
)
fqsn = f'{bsuid}.kraken' fqsn = f'{bsuid}.kraken'
dst, src = pair_info.wsname.lower().split('/')
# mkpair = MktPair(
# src=src,
# dst=dst,
# price_tick=digits_to_dec(pair_info.pair_decimals),
# size_tick=digits_to_dec(pair_info.lot_decimals),
# dst_type='crypto_currency',
# )
# breakpoint()
mktpair = Symbol.from_fqsn( mktpair = Symbol.from_fqsn(
fqsn, fqsn,
info={ info={