From d3caad6e116e4066e62608901ca80995bbe00064 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jun 2022 13:48:01 -0400 Subject: [PATCH] Factor data feeds endpoints into new sub-mod --- piker/brokers/kraken/__init__.py | 11 +- piker/brokers/kraken/api.py | 431 +--------------------------- piker/brokers/kraken/broker.py | 4 +- piker/brokers/kraken/feed.py | 464 +++++++++++++++++++++++++++++++ 4 files changed, 479 insertions(+), 431 deletions(-) create mode 100644 piker/brokers/kraken/feed.py diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 5dbe709b..47128f52 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -27,16 +27,16 @@ Sub-modules within break into the core functionalities: ''' from .api import ( get_client, +) +from .feed import ( open_history_client, open_symbol_search, stream_quotes, ) -# TODO: -# from .feed import ( -# ) from .broker import ( trades_dialogue, - # TODO: + + # TODO: part of pps/ledger work # norm_trade_records, ) @@ -52,7 +52,6 @@ __all__ = [ # tractor RPC enable arg __enable_modules__: list[str] = [ 'api', - # TODO: - # 'feed', + 'feed', 'broker', ] diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index db803cf1..2435b235 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -19,38 +19,34 @@ Kraken web API wrapping. ''' from contextlib import asynccontextmanager as acm -from dataclasses import asdict, field +from dataclasses import field from datetime import datetime -from typing import Any, Optional, Callable, Union +from typing import ( + Any, + Optional, + Union, +) import time -from trio_typing import TaskStatus import trio import pendulum import asks from fuzzywuzzy import process as fuzzy import numpy as np -import tractor from pydantic.dataclasses import dataclass -from pydantic import BaseModel -import wsproto import urllib.parse import hashlib import hmac import base64 from piker import config -from piker._cacheables import open_cached_client from piker.brokers._util import ( resproc, SymbolNotFound, BrokerError, DataThrottle, - DataUnavailable, ) -from piker.log import get_logger, get_console_log -from piker.data import ShmArray -from piker.data._web_bs import open_autorecon_ws, NoBsWs +from piker.log import get_logger log = get_logger(__name__) @@ -76,47 +72,11 @@ _ohlc_dtype = [ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = True - - _symbol_info_translation: dict[str, str] = { 'tick_decimals': 'pair_decimals', } -# https://www.kraken.com/features/api#get-tradable-pairs -class Pair(BaseModel): - altname: str # alternate pair name - wsname: str # WebSocket pair name (if available) - aclass_base: str # asset class of base component - base: str # asset id of base component - aclass_quote: str # asset class of quote component - quote: str # asset id of quote component - lot: str # volume lot size - - pair_decimals: int # scaling decimal places for pair - lot_decimals: int # scaling decimal places for volume - - # amount to multiply lot volume by to get currency volume - lot_multiplier: float - - # array of leverage amounts available when buying - leverage_buy: list[int] - # array of leverage amounts available when selling - leverage_sell: list[int] - - # fee schedule array in [volume, percent fee] tuples - fees: list[tuple[int, float]] - - # maker fee schedule array in [volume, percent fee] tuples (if on - # maker/taker) - fees_maker: list[tuple[int, float]] - - fee_volume_currency: str # volume discount currency - margin_call: str # margin call level - margin_stop: str # stop-out/liquidation margin level - ordermin: float # minimum order volume for pair - - @dataclass class OHLC: ''' @@ -485,380 +445,3 @@ def normalize_symbol( if sym in ticker: ticker = ticker.replace(sym, sym[1:]) return ticker.lower() - - -async def stream_messages( - ws: NoBsWs, -): - ''' - Message stream parser and heartbeat handler. - - Deliver ws subscription messages as well as handle heartbeat logic - though a single async generator. - - ''' - too_slow_count = last_hb = 0 - - while True: - - with trio.move_on_after(5) as cs: - msg = await ws.recv_msg() - - # trigger reconnection if heartbeat is laggy - if cs.cancelled_caught: - - too_slow_count += 1 - - if too_slow_count > 20: - log.warning( - "Heartbeat is too slow, resetting ws connection") - - await ws._connect() - too_slow_count = 0 - continue - - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - - now = time.time() - delay = now - last_hb - last_hb = now - - # XXX: why tf is this not printing without --tl flag? - log.debug(f"Heartbeat after {delay}") - # print(f"Heartbeat after {delay}") - - continue - - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - yield msg - - -async def process_data_feed_msgs( - ws: NoBsWs, -): - ''' - Parse and pack data feed messages. - - ''' - async for msg in stream_messages(ws): - - chan_id, *payload_array, chan_name, pair = msg - - if 'ohlc' in chan_name: - - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) - - elif 'spread' in chan_name: - - bid, ask, ts, bsize, asize = map(float, payload_array[0]) - - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, - - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote - - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) - - else: - print(f'UNHANDLED MSG: {msg}') - yield msg - - -def normalize( - ohlc: OHLC, - -) -> dict: - quote = asdict(ohlc) - quote['broker_ts'] = quote['time'] - quote['brokerd_ts'] = time.time() - quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') - quote['last'] = quote['close'] - quote['bar_wap'] = ohlc.vwap - - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - # XXX: piker style is always lowercases symbols. - topic = quote['pair'].replace('/', '').lower() - - # print(quote) - return topic, quote - - -def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'pair': pairs, - 'event': 'subscribe', - 'subscription': data, - } - - -@acm -async def open_history_client( - symbol: str, - -) -> tuple[Callable, int]: - - # TODO implement history getter for the new storage layer. - async with open_cached_client('kraken') as client: - - # lol, kraken won't send any more then the "last" - # 720 1m bars.. so we have to just ignore further - # requests of this type.. - queries: int = 0 - - async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, - - ) -> tuple[ - np.ndarray, - datetime, # start - datetime, # end - ]: - - nonlocal queries - if queries > 0: - raise DataUnavailable - - count = 0 - while count <= 3: - try: - array = await client.bars( - symbol, - since=end_dt, - ) - count += 1 - queries += 1 - break - except DataThrottle: - log.warning(f'kraken OHLC throttle for {symbol}') - await trio.sleep(1) - - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) - return array, start_dt, end_dt - - yield get_ohlc, {'erlangs': 1, 'rate': 1} - - -async def backfill_bars( - - sym: str, - shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - ''' - with trio.CancelScope() as cs: - async with open_cached_client('kraken') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # backend specific - sub_type: str = 'ohlc', - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Subscribe for ohlc stream of quotes for ``pairs``. - - ``pairs`` must be formatted /. - - ''' - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - ws_pairs = {} - sym_infos = {} - - async with open_cached_client('kraken') as client, send_chan as send_chan: - - # keep client cached for real-time section - for sym in symbols: - - # transform to upper since piker style is always lower - sym = sym.upper() - - si = Pair(**await client.symbol_info(sym)) # validation - syminfo = si.dict() - syminfo['price_tick_size'] = 1 / 10**si.pair_decimals - syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals - syminfo['asset_type'] = 'crypto' - sym_infos[sym] = syminfo - ws_pairs[sym] = si.wsname - - symbol = symbols[0].lower() - - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - symbol: { - 'symbol_info': sym_infos[sym], - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } - - @acm - async def subscribe(ws: wsproto.WSConnection): - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) - - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_msg(ohlc_sub) - - # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} - ) - - # pull a first quote and deliver - await ws.send_msg(l1_sub) - - yield - - # unsub from all pairs on teardown - await ws.send_msg({ - 'pair': list(ws_pairs.values()), - 'event': 'unsubscribe', - 'subscription': ['ohlc', 'spread'], - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - # see the tips on reconnection logic: - # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds - ws: NoBsWs - async with open_autorecon_ws( - 'wss://ws.kraken.com/', - fixture=subscribe, - ) as ws: - - # pull a first quote and deliver - msg_gen = process_data_feed_msgs(ws) - - # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() - - topic, quote = normalize(ohlc_last) - - task_status.started((init_msgs, quote)) - - # lol, only "closes" when they're margin squeezing clients ;P - feed_is_live.set() - - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime - - # start streaming - async for typ, ohlc in msg_gen: - - if typ == 'ohlc': - - # TODO: can get rid of all this by using - # ``trades`` subscription... - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - - # new OHLC sample interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume - - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - ohlc_last = ohlc - last = ohlc.close - - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) - - topic, quote = normalize(ohlc) - - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'].lower() - - await send_chan.send({topic: quote}) - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> Client: - async with open_cached_client('kraken') as client: - - # load all symbols locally for fast search - cache = await client.cache_symbols() - await ctx.started(cache) - - async with ctx.open_stream() as stream: - - async for pattern in stream: - - matches = fuzzy.extractBests( - pattern, - cache, - score_cutoff=50, - ) - # repack in dict form - await stream.send( - {item[0]['altname']: item[0] - for item in matches} - ) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index e115da64..c17d9daa 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -44,9 +44,11 @@ from .api import ( Client, BrokerError, get_client, - get_console_log, log, normalize_symbol, +) +from .feed import ( + get_console_log, open_autorecon_ws, NoBsWs, stream_messages, diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py new file mode 100644 index 00000000..5742bcb1 --- /dev/null +++ b/piker/brokers/kraken/feed.py @@ -0,0 +1,464 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Real-time and historical data feed endpoints. + +''' +from contextlib import asynccontextmanager as acm +from dataclasses import asdict +from datetime import datetime +from typing import ( + Any, + Optional, + Callable, +) +import time + +from fuzzywuzzy import process as fuzzy +import numpy as np +import pendulum +from pydantic import BaseModel +from trio_typing import TaskStatus +import tractor +import trio +import wsproto + +from piker._cacheables import open_cached_client +from piker.brokers._util import ( + BrokerError, + DataThrottle, + DataUnavailable, +) +from piker.log import get_console_log +from piker.data import ShmArray +from piker.data._web_bs import open_autorecon_ws, NoBsWs +from .api import ( + Client, + log, + OHLC, +) + + +# https://www.kraken.com/features/api#get-tradable-pairs +class Pair(BaseModel): + altname: str # alternate pair name + wsname: str # WebSocket pair name (if available) + aclass_base: str # asset class of base component + base: str # asset id of base component + aclass_quote: str # asset class of quote component + quote: str # asset id of quote component + lot: str # volume lot size + + pair_decimals: int # scaling decimal places for pair + lot_decimals: int # scaling decimal places for volume + + # amount to multiply lot volume by to get currency volume + lot_multiplier: float + + # array of leverage amounts available when buying + leverage_buy: list[int] + # array of leverage amounts available when selling + leverage_sell: list[int] + + # fee schedule array in [volume, percent fee] tuples + fees: list[tuple[int, float]] + + # maker fee schedule array in [volume, percent fee] tuples (if on + # maker/taker) + fees_maker: list[tuple[int, float]] + + fee_volume_currency: str # volume discount currency + margin_call: str # margin call level + margin_stop: str # stop-out/liquidation margin level + ordermin: float # minimum order volume for pair + + +async def stream_messages( + ws: NoBsWs, +): + ''' + Message stream parser and heartbeat handler. + + Deliver ws subscription messages as well as handle heartbeat logic + though a single async generator. + + ''' + too_slow_count = last_hb = 0 + + while True: + + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + + # trigger reconnection if heartbeat is laggy + if cs.cancelled_caught: + + too_slow_count += 1 + + if too_slow_count > 20: + log.warning( + "Heartbeat is too slow, resetting ws connection") + + await ws._connect() + too_slow_count = 0 + continue + + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + + now = time.time() + delay = now - last_hb + last_hb = now + + # XXX: why tf is this not printing without --tl flag? + log.debug(f"Heartbeat after {delay}") + # print(f"Heartbeat after {delay}") + + continue + + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + yield msg + + +async def process_data_feed_msgs( + ws: NoBsWs, +): + ''' + Parse and pack data feed messages. + + ''' + async for msg in stream_messages(ws): + + chan_id, *payload_array, chan_name, pair = msg + + if 'ohlc' in chan_name: + + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + + elif 'spread' in chan_name: + + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') + yield msg + + +def normalize( + ohlc: OHLC, + +) -> dict: + quote = asdict(ohlc) + quote['broker_ts'] = quote['time'] + quote['brokerd_ts'] = time.time() + quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') + quote['last'] = quote['close'] + quote['bar_wap'] = ohlc.vwap + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + # XXX: piker style is always lowercases symbols. + topic = quote['pair'].replace('/', '').lower() + + # print(quote) + return topic, quote + + +def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + https://docs.kraken.com/websockets/#message-subscribe + + ''' + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'pair': pairs, + 'event': 'subscribe', + 'subscription': data, + } + + +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('kraken') as client: + + # lol, kraken won't send any more then the "last" + # 720 1m bars.. so we have to just ignore further + # requests of this type.. + queries: int = 0 + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + nonlocal queries + if queries > 0: + raise DataUnavailable + + count = 0 + while count <= 3: + try: + array = await client.bars( + symbol, + since=end_dt, + ) + count += 1 + queries += 1 + break + except DataThrottle: + log.warning(f'kraken OHLC throttle for {symbol}') + await trio.sleep(1) + + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 1, 'rate': 1} + + +async def backfill_bars( + + sym: str, + shm: ShmArray, # type: ignore # noqa + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Fill historical bars into shared mem / storage afap. + ''' + with trio.CancelScope() as cs: + async with open_cached_client('kraken') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # backend specific + sub_type: str = 'ohlc', + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Subscribe for ohlc stream of quotes for ``pairs``. + + ``pairs`` must be formatted /. + + ''' + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + ws_pairs = {} + sym_infos = {} + + async with open_cached_client('kraken') as client, send_chan as send_chan: + + # keep client cached for real-time section + for sym in symbols: + + # transform to upper since piker style is always lower + sym = sym.upper() + + si = Pair(**await client.symbol_info(sym)) # validation + syminfo = si.dict() + syminfo['price_tick_size'] = 1 / 10**si.pair_decimals + syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals + syminfo['asset_type'] = 'crypto' + sym_infos[sym] = syminfo + ws_pairs[sym] = si.wsname + + symbol = symbols[0].lower() + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + symbol: { + 'symbol_info': sym_infos[sym], + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + }, + } + + @acm + async def subscribe(ws: wsproto.WSConnection): + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + ohlc_sub = make_sub( + list(ws_pairs.values()), + {'name': 'ohlc', 'interval': 1} + ) + + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(ohlc_sub) + + # trade data (aka L1) + l1_sub = make_sub( + list(ws_pairs.values()), + {'name': 'spread'} # 'depth': 10} + ) + + # pull a first quote and deliver + await ws.send_msg(l1_sub) + + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # see the tips on reconnection logic: + # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + ws: NoBsWs + async with open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws: + + # pull a first quote and deliver + msg_gen = process_data_feed_msgs(ws) + + # TODO: use ``anext()`` when it lands in 3.10! + typ, ohlc_last = await msg_gen.__anext__() + + topic, quote = normalize(ohlc_last) + + task_status.started((init_msgs, quote)) + + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() + + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime + + # start streaming + async for typ, ohlc in msg_gen: + + if typ == 'ohlc': + + # TODO: can get rid of all this by using + # ``trades`` subscription... + + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + + # new OHLC sample interval + if ohlc.etime > last_interval_start: + last_interval_start = ohlc.etime + tick_volume = volume + + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume + + ohlc_last = ohlc + last = ohlc.close + + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) + + topic, quote = normalize(ohlc) + + elif typ == 'l1': + quote = ohlc + topic = quote['symbol'].lower() + + await send_chan.send({topic: quote}) + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, + +) -> Client: + async with open_cached_client('kraken') as client: + + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started(cache) + + async with ctx.open_stream() as stream: + + async for pattern in stream: + + matches = fuzzy.extractBests( + pattern, + cache, + score_cutoff=50, + ) + # repack in dict form + await stream.send( + {item[0]['altname']: item[0] + for item in matches} + )