connect to krakens openOrders websocket
parent
03d2eddce3
commit
0c905920e2
|
@ -475,6 +475,21 @@ def normalize_symbol(
|
|||
return ticker.lower()
|
||||
|
||||
|
||||
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]:
|
||||
"""Create a request subscription packet dict.
|
||||
|
||||
## TODO: point to the auth urls
|
||||
https://docs.kraken.com/websockets/#message-subscribe
|
||||
|
||||
"""
|
||||
# eg. specific logic for this in kraken's sync client:
|
||||
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
||||
return {
|
||||
'event': 'subscribe',
|
||||
'subscription': data,
|
||||
}
|
||||
|
||||
|
||||
async def handle_order_requests(
|
||||
|
||||
client: Client,
|
||||
|
@ -556,30 +571,35 @@ async def handle_order_requests(
|
|||
elif action == 'cancel':
|
||||
msg = BrokerdCancel(**request_msg)
|
||||
|
||||
# Send order cancellation to kraken
|
||||
resp = await client.submit_cancel(
|
||||
reqid=msg.reqid
|
||||
)
|
||||
|
||||
# Check to make sure there was no error returned by
|
||||
# the kraken endpoint. Assert one order was cancelled
|
||||
assert resp['error'] == []
|
||||
assert resp['result']['count'] == 1
|
||||
|
||||
try:
|
||||
pending = resp['result']['pending']
|
||||
# Check to make sure the cancellation is NOT pending,
|
||||
# then send the confirmation to the ems order stream
|
||||
except KeyError:
|
||||
await ems_order_stream.send(
|
||||
BrokerdStatus(
|
||||
reqid=msg.reqid,
|
||||
account=msg.account,
|
||||
time_ns=time.time_ns(),
|
||||
status='cancelled',
|
||||
reason='Order cancelled',
|
||||
broker_details={'name': 'kraken'}
|
||||
).dict()
|
||||
)
|
||||
# Check to make sure there was no error returned by
|
||||
# the kraken endpoint. Assert one order was cancelled
|
||||
assert resp['error'] == []
|
||||
assert resp['result']['count'] == 1
|
||||
|
||||
## TODO: Change this code using .get
|
||||
try:
|
||||
pending = resp['result']['pending']
|
||||
# Check to make sure the cancellation is NOT pending,
|
||||
# then send the confirmation to the ems order stream
|
||||
except KeyError:
|
||||
await ems_order_stream.send(
|
||||
BrokerdStatus(
|
||||
reqid=msg.reqid,
|
||||
account=msg.account,
|
||||
time_ns=time.time_ns(),
|
||||
status='cancelled',
|
||||
reason='Order cancelled',
|
||||
broker_details={'name': 'kraken'}
|
||||
).dict()
|
||||
)
|
||||
except AssertionError:
|
||||
log.error(f'Order cancel was not successful')
|
||||
|
||||
else:
|
||||
log.error(f'Unknown order command: {request_msg}')
|
||||
|
@ -594,6 +614,46 @@ async def trades_dialogue(
|
|||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
# Generate
|
||||
|
||||
@asynccontextmanager
|
||||
async def subscribe(ws: wsproto.WSConnection, token: str):
|
||||
## TODO: Fix docs and points to right urls
|
||||
# XXX: setup subs
|
||||
# https://docs.kraken.com/websockets/#message-subscribe
|
||||
# specific logic for this in kraken's shitty sync client:
|
||||
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
||||
trades_sub = make_auth_sub(
|
||||
{'name': 'openOrders', 'token': token}
|
||||
)
|
||||
|
||||
# TODO: we want to eventually allow unsubs which should
|
||||
# be completely fine to request from a separate task
|
||||
# since internally the ws methods appear to be FIFO
|
||||
# locked.
|
||||
await ws.send_msg(trades_sub)
|
||||
|
||||
## trade data (aka L1)
|
||||
#l1_sub = make_sub(
|
||||
# list(ws_pairs.values()),
|
||||
# {'name': 'spread'} # 'depth': 10}
|
||||
#)
|
||||
|
||||
## pull a first quote and deliver
|
||||
#await ws.send_msg(l1_sub)
|
||||
|
||||
yield
|
||||
|
||||
# unsub from all pairs on teardown
|
||||
await ws.send_msg({
|
||||
'event': 'unsubscribe',
|
||||
'subscription': ['openOrders'],
|
||||
})
|
||||
|
||||
# XXX: do we need to ack the unsub?
|
||||
# await ws.recv_msg()
|
||||
|
||||
|
||||
# Authenticated block
|
||||
async with get_client() as client:
|
||||
acc_name = 'kraken.' + client._name
|
||||
|
@ -606,7 +666,7 @@ async def trades_dialogue(
|
|||
if float(vols[ticker]) != 0:
|
||||
msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
|
||||
all_positions.append(msg.dict())
|
||||
|
||||
|
||||
open_orders = await client.kraken_endpoint('OpenOrders', {})
|
||||
#await tractor.breakpoint()
|
||||
|
||||
|
@ -614,13 +674,85 @@ async def trades_dialogue(
|
|||
|
||||
#await trio.sleep_forever()
|
||||
|
||||
# Get websocket token for authenticated data stream
|
||||
# Assert that a token was actually received
|
||||
resp = await client.kraken_endpoint('GetWebSocketsToken', {})
|
||||
assert resp['error'] == []
|
||||
token = resp['result']['token']
|
||||
|
||||
async with (
|
||||
ctx.open_stream() as ems_stream,
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
## TODO: maybe add multiple accounts
|
||||
n.start_soon(handle_order_requests, client, ems_stream)
|
||||
|
||||
async with open_autorecon_ws(
|
||||
'wss://ws-auth.kraken.com/',
|
||||
fixture=subscribe,
|
||||
token=token,
|
||||
) as ws:
|
||||
|
||||
while True:
|
||||
with trio.move_on_after(5) as cs:
|
||||
msg = await ws.recv_msg()
|
||||
print(msg)
|
||||
|
||||
## pull a first quote and deliver
|
||||
#msg_gen = stream_messages(ws)
|
||||
|
||||
## TODO: use ``anext()`` when it lands in 3.10!
|
||||
#typ, ohlc_last = await msg_gen.__anext__()
|
||||
|
||||
#topic, quote = normalize(ohlc_last)
|
||||
|
||||
#first_quote = {topic: quote}
|
||||
#task_status.started((init_msgs, first_quote))
|
||||
|
||||
## lol, only "closes" when they're margin squeezing clients ;P
|
||||
#feed_is_live.set()
|
||||
|
||||
## keep start of last interval for volume tracking
|
||||
#last_interval_start = ohlc_last.etime
|
||||
|
||||
## start streaming
|
||||
#async for typ, ohlc in msg_gen:
|
||||
|
||||
# if typ == 'ohlc':
|
||||
|
||||
# # TODO: can get rid of all this by using
|
||||
# # ``trades`` subscription...
|
||||
|
||||
# # generate tick values to match time & sales pane:
|
||||
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
|
||||
# volume = ohlc.volume
|
||||
|
||||
# # new OHLC sample interval
|
||||
# if ohlc.etime > last_interval_start:
|
||||
# last_interval_start = ohlc.etime
|
||||
# tick_volume = volume
|
||||
|
||||
# else:
|
||||
# # this is the tick volume *within the interval*
|
||||
# tick_volume = volume - ohlc_last.volume
|
||||
|
||||
# ohlc_last = ohlc
|
||||
# last = ohlc.close
|
||||
|
||||
# if tick_volume:
|
||||
# ohlc.ticks.append({
|
||||
# 'type': 'trade',
|
||||
# 'price': last,
|
||||
# 'size': tick_volume,
|
||||
# })
|
||||
|
||||
# topic, quote = normalize(ohlc)
|
||||
|
||||
# elif typ == 'l1':
|
||||
# quote = ohlc
|
||||
# topic = quote['symbol'].lower()
|
||||
|
||||
# await send_chan.send({topic: quote})
|
||||
|
||||
|
||||
async def stream_messages(ws):
|
||||
|
||||
|
@ -832,7 +964,7 @@ async def stream_quotes(
|
|||
# XXX: do we need to ack the unsub?
|
||||
# await ws.recv_msg()
|
||||
|
||||
# see the tips on reonnection logic:
|
||||
# see the tips on reconnection logic:
|
||||
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
|
||||
async with open_autorecon_ws(
|
||||
'wss://ws.kraken.com/',
|
||||
|
|
|
@ -53,11 +53,13 @@ class NoBsWs:
|
|||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
token: str,
|
||||
stack: AsyncExitStack,
|
||||
fixture: Callable,
|
||||
serializer: ModuleType = json,
|
||||
):
|
||||
self.url = url
|
||||
self.token = token
|
||||
self.fixture = fixture
|
||||
self._stack = stack
|
||||
self._ws: 'WebSocketConnection' = None # noqa
|
||||
|
@ -81,9 +83,15 @@ class NoBsWs:
|
|||
trio_websocket.open_websocket_url(self.url)
|
||||
)
|
||||
# rerun user code fixture
|
||||
ret = await self._stack.enter_async_context(
|
||||
self.fixture(self)
|
||||
)
|
||||
if self.token == '':
|
||||
ret = await self._stack.enter_async_context(
|
||||
self.fixture(self)
|
||||
)
|
||||
else:
|
||||
ret = await self._stack.enter_async_context(
|
||||
self.fixture(self, self.token)
|
||||
)
|
||||
|
||||
assert ret is None
|
||||
|
||||
log.info(f'Connection success: {self.url}')
|
||||
|
@ -127,12 +135,14 @@ async def open_autorecon_ws(
|
|||
|
||||
# TODO: proper type annot smh
|
||||
fixture: Callable,
|
||||
# used for authenticated websockets
|
||||
token: str = '',
|
||||
) -> AsyncGenerator[tuple[...], NoBsWs]:
|
||||
"""Apparently we can QoS for all sorts of reasons..so catch em.
|
||||
|
||||
"""
|
||||
async with AsyncExitStack() as stack:
|
||||
ws = NoBsWs(url, stack, fixture=fixture)
|
||||
ws = NoBsWs(url, token, stack, fixture=fixture)
|
||||
await ws._connect()
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue