From 0c905920e211ecf77bc7e2ede0d4dbde2186afef Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Wed, 9 Feb 2022 21:30:39 -0500 Subject: [PATCH] connect to krakens openOrders websocket --- piker/brokers/kraken.py | 176 +++++++++++++++++++++++++++++++++++----- piker/data/_web_bs.py | 18 +++- 2 files changed, 168 insertions(+), 26 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 395d24f1..2098d25e 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -475,6 +475,21 @@ def normalize_symbol( return ticker.lower() +def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: + """Create a request subscription packet dict. + + ## TODO: point to the auth urls + https://docs.kraken.com/websockets/#message-subscribe + + """ + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'event': 'subscribe', + 'subscription': data, + } + + async def handle_order_requests( client: Client, @@ -556,30 +571,35 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) + # Send order cancellation to kraken resp = await client.submit_cancel( reqid=msg.reqid ) - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled - assert resp['error'] == [] - assert resp['result']['count'] == 1 - try: - pending = resp['result']['pending'] - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - except KeyError: - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 + + ## TODO: Change this code using .get + try: + pending = resp['result']['pending'] + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + except KeyError: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + except AssertionError: + log.error(f'Order cancel was not successful') else: log.error(f'Unknown order command: {request_msg}') @@ -594,6 +614,46 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # Generate + + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection, token: str): + ## TODO: Fix docs and points to right urls + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + trades_sub = make_auth_sub( + {'name': 'openOrders', 'token': token} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(trades_sub) + + ## trade data (aka L1) + #l1_sub = make_sub( + # list(ws_pairs.values()), + # {'name': 'spread'} # 'depth': 10} + #) + + ## pull a first quote and deliver + #await ws.send_msg(l1_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'event': 'unsubscribe', + 'subscription': ['openOrders'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # Authenticated block async with get_client() as client: acc_name = 'kraken.' + client._name @@ -606,7 +666,7 @@ async def trades_dialogue( if float(vols[ticker]) != 0: msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - + open_orders = await client.kraken_endpoint('OpenOrders', {}) #await tractor.breakpoint() @@ -614,13 +674,85 @@ async def trades_dialogue( #await trio.sleep_forever() + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.kraken_endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] + async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) - + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + + while True: + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + print(msg) + + ## pull a first quote and deliver + #msg_gen = stream_messages(ws) + + ## TODO: use ``anext()`` when it lands in 3.10! + #typ, ohlc_last = await msg_gen.__anext__() + + #topic, quote = normalize(ohlc_last) + + #first_quote = {topic: quote} + #task_status.started((init_msgs, first_quote)) + + ## lol, only "closes" when they're margin squeezing clients ;P + #feed_is_live.set() + + ## keep start of last interval for volume tracking + #last_interval_start = ohlc_last.etime + + ## start streaming + #async for typ, ohlc in msg_gen: + + # if typ == 'ohlc': + + # # TODO: can get rid of all this by using + # # ``trades`` subscription... + + # # generate tick values to match time & sales pane: + # # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + # volume = ohlc.volume + + # # new OHLC sample interval + # if ohlc.etime > last_interval_start: + # last_interval_start = ohlc.etime + # tick_volume = volume + + # else: + # # this is the tick volume *within the interval* + # tick_volume = volume - ohlc_last.volume + + # ohlc_last = ohlc + # last = ohlc.close + + # if tick_volume: + # ohlc.ticks.append({ + # 'type': 'trade', + # 'price': last, + # 'size': tick_volume, + # }) + + # topic, quote = normalize(ohlc) + + # elif typ == 'l1': + # quote = ohlc + # topic = quote['symbol'].lower() + + # await send_chan.send({topic: quote}) + async def stream_messages(ws): @@ -832,7 +964,7 @@ async def stream_quotes( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # see the tips on reonnection logic: + # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds async with open_autorecon_ws( 'wss://ws.kraken.com/', diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index d2a15e06..820d5054 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -53,11 +53,13 @@ class NoBsWs: def __init__( self, url: str, + token: str, stack: AsyncExitStack, fixture: Callable, serializer: ModuleType = json, ): self.url = url + self.token = token self.fixture = fixture self._stack = stack self._ws: 'WebSocketConnection' = None # noqa @@ -81,9 +83,15 @@ class NoBsWs: trio_websocket.open_websocket_url(self.url) ) # rerun user code fixture - ret = await self._stack.enter_async_context( - self.fixture(self) - ) + if self.token == '': + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + else: + ret = await self._stack.enter_async_context( + self.fixture(self, self.token) + ) + assert ret is None log.info(f'Connection success: {self.url}') @@ -127,12 +135,14 @@ async def open_autorecon_ws( # TODO: proper type annot smh fixture: Callable, + # used for authenticated websockets + token: str = '', ) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em. """ async with AsyncExitStack() as stack: - ws = NoBsWs(url, stack, fixture=fixture) + ws = NoBsWs(url, token, stack, fixture=fixture) await ws._connect() try: