piker/piker/brokers/kraken/broker.py

1162 lines
42 KiB
Python
Raw Normal View History

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Order api and machinery
'''
from collections import ChainMap, defaultdict
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial
from itertools import count
import math
from pprint import pformat
import time
from typing import (
Any,
AsyncIterator,
Iterable,
Union,
)
2022-07-08 21:17:28 +00:00
from async_generator import aclosing
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
from bidict import bidict
import pendulum
import trio
import tractor
from piker.pp import (
Position,
PpTable,
Transaction,
open_trade_ledger,
open_pps,
)
from piker.clearing._messages import (
Order,
Status,
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
)
from . import log
from .api import (
Client,
BrokerError,
get_client,
)
from .feed import (
get_console_log,
open_autorecon_ws,
NoBsWs,
stream_messages,
)
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
# TODO: make this wrap the `Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit
# requests?
class BrokerClient:
'''
Actor global, client-unique order manager API.
For now provides unique ``brokerd`` defined "request ids"
and "user reference" values to track ``kraken`` ws api order
dialogs.
'''
counter: Iterable = count(1)
_table: set[int] = set()
@classmethod
def new_reqid(cls) -> int:
for reqid in cls.counter:
if reqid not in cls._table:
cls._table.add(reqid)
return reqid
@classmethod
def add_reqid(cls, reqid: int) -> None:
cls._table.add(reqid)
async def handle_order_requests(
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
ws: NoBsWs,
client: Client,
ems_order_stream: tractor.MsgStream,
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
token: str,
apiflows: dict[int, ChainMap[dict[str, dict]]],
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None:
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
'''
Process new order submission requests from the EMS
and deliver acks or errors.
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
'''
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
reqid = ids[cancel.oid]
try:
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST CANCEL/EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'Edit too fast:{reqid}, cancelling..'
),
)
)
else:
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case {
'account': 'kraken.spot' as account,
'action': action,
} if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**msg)
# logic from old `Client.submit_limit()`
if order.oid in ids:
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
try:
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, cancelling..'
),
)
)
else:
extra = {
'orderid': txid, # txid
'newuserref': str(reqid),
}
else:
ep = 'addOrder'
reqid = BrokerClient.new_reqid()
ids[order.oid] = reqid
log.debug(
f"Adding order {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': order.action,
}
psym = order.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
# XXX: ACK the request **immediately** before sending
# the api side request to ensure the ems maps the oid ->
# reqid correctly!
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=reqid, # our custom int mapping
account=account, # piker account
)
await ems_order_stream.send(resp)
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
req = {
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
'event': ep,
'token': token,
'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us
# a request dialog and an order's state-liftime are
# treated the same. Also this used to not work, the
# values used to be mutex for some odd reason until
# we dealt with support about it, and then they
# fixed it and pretended like we were crazy and the
# issue was never there lmao... coorps bro.
# 'userref': str(reqid),
'userref': str(reqid),
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
'pair': pair,
'price': str(order.price),
'volume': str(order.size),
# validate: 'true', # validity check, nothing more
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
# placehold for sanity checking in relay loop
apiflows[reqid].maps.append(msg)
case _:
account = msg.get('account')
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
))
)
@acm
async def subscribe(
ws: NoBsWs,
token: str,
subs: list[tuple[str, dict]] = [
('ownTrades', {
# don't send first 50 trades on startup,
# we already pull this manually from the rest endpoint.
'snapshot': False,
},),
('openOrders', {
# include rate limit counters
'ratecounter': True,
},),
],
):
'''
Setup ws api subscriptions:
https://docs.kraken.com/websockets/#message-subscribe
By default we sign up for trade and order update events.
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token
subnames: set[str] = set()
for name, sub_opts in subs:
msg = {
'event': 'subscribe',
'subscription': {
'name': name,
'token': token,
**sub_opts,
}
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_msg(msg)
subnames.add(name)
# wait on subscriptionn acks
with trio.move_on_after(5):
while True:
match (msg := await ws.recv_msg()):
case {
'event': 'subscriptionStatus',
'status': 'subscribed',
'subscription': sub_opts,
} as msg:
log.info(
f'Sucessful subscribe for {sub_opts}:\n'
f'{pformat(msg)}'
)
subnames.remove(sub_opts['name'])
if not subnames:
break
case {
'event': 'subscriptionStatus',
'status': 'error',
'errorMessage': errmsg,
} as msg:
raise RuntimeError(
f'{errmsg}\n\n'
f'{pformat(msg)}'
)
yield
for sub in subs:
# unsub from all pairs on teardown
await ws.send_msg({
'event': 'unsubscribe',
'subscription': [sub],
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
def trades2pps(
table: PpTable,
acctid: str,
new_trans: dict[str, Transaction] = {},
) -> tuple[
list[BrokerdPosition],
list[Transaction],
]:
if new_trans:
updated = table.update_from_trans(
new_trans,
)
log.info(f'Updated pps:\n{pformat(updated)}')
pp_entries, closed_pp_objs = table.dump_active()
pp_objs: dict[Union[str, int], Position] = table.pps
pps: dict[int, Position]
position_msgs: list[dict] = []
for pps in [pp_objs, closed_pp_objs]:
for tid, p in pps.items():
msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account='kraken.' + acctid,
symbol=p.symbol.front_fqsn(),
size=p.size,
2022-07-27 16:28:22 +00:00
avg_price=p.ppu,
currency='',
)
position_msgs.append(msg)
return position_msgs
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with get_client() as client:
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# await ctx.started(({}, ['paper']))
if not client._api_key:
raise RuntimeError(
'Missing Kraken API key in `brokers.toml`!?!?')
# auth required block
acctid = client._name
acc_name = 'kraken.' + acctid
# task local msg dialog tracking
apiflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: bidict[int, str] = bidict()
# NOTE: testing code for making sure the rt incremental update
# of positions, via newly generated msgs works. In order to test
# this,
# - delete the *ABSOLUTE LAST* entry from accont's corresponding
# trade ledgers file (NOTE this MUST be the last record
# delivered from the
# api ledger),
# - open you ``pps.toml`` and find that same tid and delete it
# from the pp's clears table,
# - set this flag to `True`
#
# You should see an update come in after the order mode
# boots up which shows your latest correct asset
# balance size after the "previously unknown simulating a live
# fill" update comes in on the relay loop even though the fill
# will be ignored by the ems (no known reqid) the pp msg should
# update things correctly.
simulate_pp_update: bool = False
with (
open_pps(
'kraken',
acctid,
) as table,
open_trade_ledger(
'kraken',
acctid,
) as ledger_dict,
):
# transaction-ify the ledger entries
ledger_trans = norm_trade_records(ledger_dict)
# TODO: eventually probably only load
# as far back as it seems is not deliverd in the
# most recent 50 trades and assume that by ordering we
# already have those records in the ledger.
tids2trades = await client.get_trades()
ledger_dict.update(tids2trades)
api_trans = norm_trade_records(tids2trades)
# retrieve kraken reported balances
# and do diff with ledger to determine
# what amount of trades-transactions need
# to be reloaded.
sizes = await client.get_balances()
for dst, size in sizes.items():
# we don't care about tracking positions
# in the user's source fiat currency.
if dst == client.conf['src_fiat']:
continue
def has_pp(dst: str) -> Position | bool:
pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps}
pair = pps_dst_assets.get(dst)
pp = table.pps.get(pair)
if (
not pair or not pp
or not math.isclose(pp.size, size)
):
return False
return pp
pos = has_pp(dst)
if not pos:
# we have a balance for which there is no pp
# entry? so we have to likely update from the
# ledger.
updated = table.update_from_trans(ledger_trans)
log.info(f'Updated pps from ledger:\n{pformat(updated)}')
pos = has_pp(dst)
if not pos and not simulate_pp_update:
# try reloading from API
table.update_from_trans(api_trans)
pos = has_pp(dst)
if not pos:
# get transfers to make sense of abs balances.
# NOTE: we do this after ledger and API
# loading since we might not have an entry
# in the ``pps.toml`` for the necessary pair
# yet and thus this likely pair grabber will
# likely fail.
likely_pair = {
bsuid[:3]: bsuid
for bsuid in table.pps
}.get(dst)
if not likely_pair:
raise ValueError(
'Could not find a position pair in '
'ledger for likely widthdrawal '
f'candidate: {dst}'
)
if likely_pair:
# this was likely pp that had a withdrawal
# from the dst asset out of the account.
xfer_trans = await client.get_xfers(
dst,
src_asset=likely_pair[3:],
)
if xfer_trans:
updated = table.update_from_trans(
xfer_trans,
cost_scalar=1,
)
log.info(
'Updated {dst} from transfers:\n'
f'{pformat(updated)}'
)
if not has_pp(dst):
raise ValueError(
'Could not reproduce balance:\n'
f'dst: {dst}, {size}\n'
)
# only for simulate-testing a "new fill" since
# otherwise we have to actually conduct a live clear.
if simulate_pp_update:
tid = list(tids2trades)[0]
last_trade_dict = tids2trades[tid]
# stage a first reqid of `0`
reqids2txids[0] = last_trade_dict['ordertxid']
ppmsgs = trades2pps(
table,
acctid,
)
await ctx.started((ppmsgs, [acc_name]))
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
err = resp.get('error')
if err:
raise BrokerError(err)
token = resp['result']['token']
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=partial(
subscribe,
token=token,
),
) as ws,
aclosing(stream_messages(ws)) as stream,
trio.open_nursery() as nurse,
):
stream = stream_messages(ws)
# task for processing inbound requests from ems
nurse.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
apiflows,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
table,
api_trans,
acctid,
acc_name,
token,
)
async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
apiflows: dict[int, ChainMap[dict[str, dict]]],
ids: bidict[str, int],
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
reqids2txids: bidict[int, str],
table: PpTable,
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
ledger_trans: dict[str, Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
2022-07-08 21:17:28 +00:00
async for msg in ws_stream:
match msg:
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
# TODO: turns out you get the fill events from the
# `openOrders` before you get this, so it might be better
# to do all fill/status/pp updates in that sub and just use
# this one for ledger syncs?
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
# For eg. we could take the "last 50 trades" and do a diff
# with the ledger and then only do a re-sync if something
# seems amiss?
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
# format as tid -> trade event map
# eg. received msg format,
# [{'TOKWHY-SMTUB-G5DOI6': {
# 'cost': '95.29047',
# 'fee': '0.24776',
# 'margin': '0.00000',
# 'ordertxid': 'OKSUXM-3OLSB-L7TN72',
# 'ordertype': 'limit',
# 'pair': 'XBT/EUR',
# 'postxid': 'TKH2SE-M7IF5-CFI7LT',
# 'price': '21268.20000',
# 'time': '1657990947.640891',
# 'type': 'buy',
# 'vol': '0.00448042'
# }}]
case [
trades_msgs,
'ownTrades',
{'sequence': seq},
]:
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
log.info(
f'ownTrades update_{seq}:\n'
f'{pformat(trades_msgs)}'
)
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# don't re-process datums we've already seen
# if tid not in ledger_trans
}
for tid, trade in trades.items():
assert tid not in ledger_trans
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
txid = trade['ordertxid']
reqid = trade.get('userref')
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
if not reqid:
# NOTE: yet again, here we don't have any ref to the
# reqid that's generated by us (as the client) and
# sent in the order request, so we have to look it
# up from our own registry...
reqid = reqids2txids.inverse[txid]
if not reqid:
log.warning(f'Unknown trade dialog: {txid}')
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# TODO: we can emit this on the "closed" state in
# the `openOrders` sub-block below.
status_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(status_msg)
new_trans = norm_trade_records(trades)
ppmsgs = trades2pps(
table,
acctid,
new_trans,
)
for pp_msg in ppmsgs:
await ems_stream.send(pp_msg)
ledger_trans.update(new_trans)
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
case [
order_msgs,
'openOrders',
{'sequence': seq},
]:
for order_msg in order_msgs:
log.info(
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
f'`openOrders` msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
# XXX: eg. of full msg schema:
# {'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm} # 0.0000
match update_msg:
# EMS-unknown live order that needs to be
# delivered and loaded on the client-side.
case {
'userref': reqid,
'descr': {
'pair': pair,
'price': price,
'type': action,
},
'vol': vol,
# during a fill this field is **not**
# provided! but, it is always avail on
# actual status updates.. see case above.
'status': status,
**rest,
} if (
ids.inverse.get(reqid) is None
):
# parse out existing live order
fqsn = pair.replace('/', '').lower()
price = float(price)
size = float(vol)
# register the userref value from
# kraken (usually an `int` staring
# at 1?) as our reqid.
reqids2txids[reqid] = txid
oid = str(reqid)
ids[oid] = reqid # NOTE!: str -> int
# ensure wtv reqid they give us we don't re-use on
# new order submissions to this actor's client.
BrokerClient.add_reqid(reqid)
# fill out ``Status`` + boxed ``Order``
status_msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=oid,
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=oid,
symbol=fqsn,
account=acc_name,
price=price,
size=size,
),
src='kraken',
)
apiflows[reqid].maps.append(status_msg)
await ems_stream.send(status_msg)
continue
case {
'userref': reqid,
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
# during a fill this field is **not**
# provided! but, it is always avail on
# actual status updates.. see case above.
'status': status,
**rest,
}:
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
# TODO: store this in a ChainMap instance
# per order dialog.
# submit_vlm = rest.get('vol', 0)
# fee = rest.get('fee', 0)
if status == 'closed':
vlm = 0
else:
vlm = rest.get('vol_exec', 0)
if status == 'canceled':
reqids2txids.pop(reqid)
# we specially ignore internal order
# updates triggered by kraken's "edit"
# endpoint.
if rest['cancel_reason'] == 'Order replaced':
# TODO:
# - put the edit order status update
# code here?
# - send open order status msg.
log.info(
f'Order replaced: {txid}@reqid={reqid}'
)
# we don't do normal msg emission on
# a replacement cancel since it's
# the result of an "edited order"
# and thus we mask the kraken
# backend cancel then create details
# from the ems side.
continue
else:
# XXX: keep kraken engine's ``txid`` synced
# with the ems dialog's ``reqid``.
reqids2txids[reqid] = txid
ourreqid = reqids2txids.inverse.get(txid)
if ourreqid is None:
log.info(
'Mapping new txid to our reqid:\n'
f'{reqid} -> {txid}'
)
oid = ids.inverse.get(reqid)
# XXX: too fast edit handled by the
# request handler task: this
# scenario occurs when ems side
# requests are coming in too quickly
# such that there is no known txid
# yet established for the ems
# dialog's last reqid when the
# request handler task is already
# receceiving a new update for that
# reqid. In this case we simply mark
# the reqid as being "too fast" and
# then when we get the next txid
# update from kraken's backend, and
# thus the new txid, we simply
# cancel the order for now.
# TODO: Ideally we eventually
# instead make the client side of
# the ems block until a submission
# is confirmed by the backend
# instead of this hacky throttle
# style approach and avoid requests
# coming in too quickly on the other
# side of the ems, aka the client
# <-> ems dialog.
if (
status == 'open'
and isinstance(
reqids2txids.get(reqid),
TooFastEdit
)
):
# TODO: don't even allow this case
# by not moving the client side line
# until an edit confirmation
# arrives...
log.cancel(
f'Received too fast edit {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
continue
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(), # cuz why not
account=f'kraken.{acctid}',
# everyone doin camel case..
status=status, # force lower case
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
filled=vlm,
reason='', # why held?
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
remaining=vlm,
# TODO: need to extract the submit vlm
# from a prior msg update..
# (
# float(submit_vlm)
# -
# float(exec_vlm)
# ),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
)
apiflows[reqid].maps.append(update_msg)
2022-07-09 16:59:09 +00:00
await ems_stream.send(resp)
First draft, working WS based order management Move to using the websocket API for all order control ops and dropping the sync rest api approach which resulted in a bunch of buggy races. Further this gets us must faster (batch) order cancellation for free and a simpler ems request handler loop. We now heavily leverage the new py3.10 `match:` syntax for all kraken-side API msg parsing and processing and handle both the `openOrders` and `ownTrades` subscription streams. We also block "order editing" (by immediate cancellation) for now since the EMS isn't entirely yet equipped to handle brokerd side `.reqid` changes (which is how kraken implements so called order "updates" or "edits") for a given order-request dialog and we may want to even consider just implementing "updates" ourselves via independent cancel and submit requests? Definitely something to ponder. Alternatively we can "masquerade" such updates behind the count-style `.oid` remapping we had to implement anyway (kraken's limitation) and maybe everything will just work? Further details in this patch: - create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable local lookup of ems uids to piker-backend-client-side request ids and received order messages. - add `openOrders` sub support which more or less directly relays to equivalent `BrokerdStatus` updates and calc the `.filled` and `.remaining` values based on cleared vlm updates. - add handler blocks for `[add/edit/cancel]OrderStatus` events including error msg cases. - don't do any order request response processing in `handle_order_requests()` since responses are always received via one (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus such msgs are now handled in the response relay loop. Relates to #290 Resolves #310, #296
2022-07-05 02:00:56 +00:00
# fill msg.
# eg. contents (in total):
# {
# 'vol_exec': '0.84709869',
# 'cost': '101.25370642',
# 'fee': '0.26325964',
# 'avg_price': '119.53000001',
# 'userref': 0,
# }
# NOTE: there is no `status` field
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
case {
'vol_exec': vlm,
'avg_price': price,
'userref': reqid,
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
**rest,
} as msg:
ourreqid = reqids2txids.inverse[txid]
assert reqid == ourreqid
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
log.info(
f'openOrders vlm={vlm} Fill for {reqid}:\n'
f'{update_msg}'
)
fill_msg = BrokerdFill(
time_ns=time.time_ns(),
reqid=reqid,
# just use size value for now?
# action=action,
size=float(vlm),
price=float(price),
# TODO: maybe capture more msg data
# i.e fees?
2022-09-01 18:16:41 +00:00
broker_details={'name': 'kraken'} | order_msg,
broker_time=time.time(),
)
await ems_stream.send(fill_msg)
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
# order request status updates
case {
'event': etype,
'status': status,
'reqid': reqid,
**rest,
} as event if (
etype in {
'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
):
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
log.info(
f'{etype}:\n'
f'{pformat(msg)}'
)
txid = rest.get('txid')
lasttxid = reqids2txids.get(reqid)
Fixes for state updates and clears Turns out the `openOrders` and `ownTrades` subs always return a `reqid` value (the one brokerd sends to the kraken api in order requests) is always set to zero, which seems to be a bug? So this includes patches to work around that as well reliance on the `openOrders` sub to do most `BrokerdStatus` updates since `XOrderStatus` events don't seem to have much data in them at all (they almost look like pure ack events so maybe they aren't affirmative of final state changes anyway..). Other fixes: - respond with a `BrokerdOrderAck` immediately after `requid` generation not after order submission to ensure the ems has a valid `requid` *before* kraken api events are relayed through. - add a `reqids2txids: bidict[int, str]` which maps brokerd genned `requid`s to kraken-side `txid`s since (as mentioned above) the clearing and state endpoints don't relay back this value (it's always 0...) - add log messages for each sub so that (at least for now) we can see exact msg contents coming from kraken. - drop `.remaining` calcs for now since we need to keep record of the order states manually in order to retreive the original submission vlm.. - fix the `openOrders` case for fills, in this case the message includes no `status` field and thus we must catch it in a block *after* the normal state handler to avoid masking. - drop response msg generation from the cancel status case since we can do it again from the `openOrders` handler and sending a double status causes issues on the client side. - add a shite ton of notes around all this missing `requid` stuff.
2022-07-10 20:16:23 +00:00
# TODO: relay these to EMS once it supports
# open order loading.
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
# update the msg chain
chain = apiflows[reqid]
chain.maps.append(event)
if status == 'error':
# any of ``{'add', 'edit', 'cancel'}``
action = etype.removesuffix('OrderStatus')
errmsg = rest['errorMessage']
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
await ems_stream.send(BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=chain.get('symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
))
txid = txid or lasttxid
if (
txid
# we throttle too-fast-requests on the ems side
2022-08-06 01:00:54 +00:00
and not isinstance(txid, TooFastEdit)
):
# client was editting too quickly
# so we instead cancel this order
log.cancel(
f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def norm_trade_records(
ledger: dict[str, Any],
) -> dict[str, Transaction]:
records: dict[str, Transaction] = {}
for tid, record in ledger.items():
size = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
# we normalize to kraken's `altname` always..
bsuid = norm_sym = Client.normalize_symbol(record['pair'])
records[tid] = Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
bsuid=bsuid,
# XXX: there are no derivs on kraken right?
# expiry=expiry,
)
return records
@cm
def open_ledger(
acctid: str,
trade_entries: list[dict[str, Any]],
) -> set[Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
'''
with open_trade_ledger(
'kraken',
acctid,
) as ledger:
yield ledger
# update on exit
ledger.update(trade_entries)