binance: breakout into `feed` and `broker` mods like other backends

basic_buy_bot
Tyler Goodlet 2023-06-09 16:45:02 -04:00
parent ed0c2555fc
commit ae1c5a0db0
4 changed files with 609 additions and 520 deletions

View File

@ -23,16 +23,15 @@ binancial secs on the floor, in the office, behind the dumpster.
"""
from .api import (
get_client,
# )
# from .feed import (
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
# )
# from .broker import (
)
from .broker import (
trades_dialogue,
# norm_trade_records,
)
@ -43,13 +42,12 @@ __all__ = [
'open_history_client',
'open_symbol_search',
'stream_quotes',
# 'norm_trade_records',
]
# tractor RPC enable arg
# `brokerd` modules
__enable_modules__: list[str] = [
'api',
# 'feed',
# 'broker',
'feed',
'broker',
]

View File

@ -25,66 +25,32 @@ from __future__ import annotations
from collections import OrderedDict
from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from datetime import datetime
from decimal import Decimal
import itertools
from typing import (
Any,
Union,
AsyncIterator,
AsyncGenerator,
Callable,
)
import hmac
import time
import hashlib
from pathlib import Path
import trio
from trio_typing import TaskStatus
from pendulum import (
now,
from_timestamp,
)
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from piker import config
from piker._cacheables import (
async_lifo_cache,
open_cached_client,
)
from piker.accounting._mktinfo import (
Asset,
MktPair,
digits_to_dec,
)
from piker.data.types import Struct
from piker.data.validate import FeedInit
from piker.data import def_iohlcv_fields
from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
from piker.clearing._messages import (
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdPosition,
BrokerdFill,
BrokerdCancel,
# BrokerdError,
)
from piker.brokers._util import (
resproc,
SymbolNotFound,
DataUnavailable,
get_logger,
get_console_log,
)
log = get_logger('piker.brokers.binance')
@ -211,18 +177,6 @@ class OHLC(Struct):
bar_wap: float = 0.0
class L1(Struct):
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
update_id: int
sym: str
bid: float
bsize: float
ask: float
asize: float
# convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp(
when: datetime
@ -644,468 +598,3 @@ async def get_client() -> Client:
log.info('Caching exchange infos..')
await client.exch_info()
yield client
# validation type
class AggTrade(Struct, frozen=True):
e: str # Event type
E: int # Event time
s: str # Symbol
a: int # Aggregate trade ID
p: float # Price
q: float # Quantity
f: int # First trade ID
l: int # noqa Last trade ID
T: int # Trade time
m: bool # Is the buyer the market maker?
M: bool # Ignore
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here!
msg: dict[str, Any]
async for msg in ws:
match msg:
# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
case {
# NOTE: this is never an old value it seems, so
# they are always sending real L1 spread updates.
'u': upid, # update id
's': sym,
'b': bid,
'B': bsize,
'a': ask,
'A': asize,
}:
# TODO: it would be super nice to have a `L1` piker type
# which "renders" incremental tick updates from a packed
# msg-struct:
# - backend msgs after packed into the type such that we
# can reduce IPC usage but without each backend having
# to do that incremental update logic manually B)
# - would it maybe be more efficient to use this instead?
# https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream
l1 = L1(
update_id=upid,
sym=sym,
bid=bid,
bsize=bsize,
ask=ask,
asize=asize,
)
l1.typecast()
# repack into piker's tick-quote format
yield 'l1', {
'symbol': l1.sym,
'ticks': [
{
'type': 'bid',
'price': l1.bid,
'size': l1.bsize,
},
{
'type': 'bsize',
'price': l1.bid,
'size': l1.bsize,
},
{
'type': 'ask',
'price': l1.ask,
'size': l1.asize,
},
{
'type': 'asize',
'price': l1.ask,
'size': l1.asize,
}
]
}
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
case {
'e': 'aggTrade',
}:
# NOTE: this is purely for a definition,
# ``msgspec.Struct`` does not runtime-validate until you
# decode/encode, see:
# https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg) # TODO: should we .copy() ?
piker_quote: dict = {
'symbol': msg.s,
'last': float(msg.p),
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
yield 'trade', piker_quote
def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
'''
Create a request subscription packet dict.
- spot:
https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
- futes:
https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
'''
return {
'method': 'SUBSCRIBE',
'params': [
f'{pair.lower()}@{sub_name}'
for pair in pairs
],
'id': uid
}
@acm
async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
symbol: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client:
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars(
symbol,
start_dt=start_dt,
end_dt=end_dt,
)
times = array['time']
if (
end_dt is None
):
inow = round(time.time())
if (inow - times[-1]) > 60:
await tractor.breakpoint()
start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair]:
async with open_cached_client('binance') as client:
pair: Pair = await client.exch_info(fqme.upper())
mkt = MktPair(
dst=Asset(
name=pair.baseAsset,
atype='crypto',
tx_tick=digits_to_dec(pair.baseAssetPrecision),
),
src=Asset(
name=pair.quoteAsset,
atype='crypto',
tx_tick=digits_to_dec(pair.quoteAssetPrecision),
),
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
broker='binance',
)
both = mkt, pair
return both
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with (
send_chan as send_chan,
):
init_msgs: list[FeedInit] = []
for sym in symbols:
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
iter_subids = itertools.count()
@acm
async def subscribe(ws: NoBsWs):
# setup subs
subid: int = next(iter_subids)
# trade data (aka L1)
# https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
l1_sub = make_sub(symbols, 'bookTicker', subid)
await ws.send_msg(l1_sub)
# aggregate (each order clear by taker **not** by maker)
# trades data:
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
agg_trades_sub = make_sub(symbols, 'aggTrade', subid)
await ws.send_msg(agg_trades_sub)
# might get ack from ws server, or maybe some
# other msg still in transit..
res = await ws.recv_msg()
subid: str | None = res.get('id')
if subid:
assert res['id'] == subid
yield
subs = []
for sym in symbols:
subs.append("{sym}@aggTrade")
subs.append("{sym}@bookTicker")
# unsub from all pairs on teardown
if ws.connected():
await ws.send_msg({
"method": "UNSUBSCRIBE",
"params": subs,
"id": subid,
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
async with (
open_autorecon_ws(
# XXX: see api docs which show diff addr?
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
# 'wss://ws-api.binance.com:443/ws-api/v3',
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws,
# avoid stream-gen closure from breaking trio..
aclosing(stream_messages(ws)) as msg_gen,
):
typ, quote = await anext(msg_gen)
# pull a first quote and deliver
while typ != 'trade':
typ, quote = await anext(msg_gen)
task_status.started((init_msgs, quote))
# signal to caller feed is ready for consumption
feed_is_live.set()
# import time
# last = time.time()
# start streaming
async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()
async def handle_order_requests(
ems_order_stream: tractor.MsgStream
) -> None:
async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await client.submit_limit(
order.symbol,
order.action,
order.size,
order.price,
oid=order.oid
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(msg.symbol, msg.reqid)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None
) -> AsyncIterator[dict[str, Any]]:
async with open_cached_client('binance') as client:
if not client.api_key:
await ctx.started('paper')
return
# table: PpTable
# ledger: TransactionLedger
# TODO: load pps and accounts using accounting apis!
positions: list[BrokerdPosition] = []
accounts: list[str] = ['binance.default']
await ctx.started((positions, accounts))
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
open_cached_client('binance') as client,
client.manage_listen_key() as listen_key,
):
n.start_soon(handle_order_requests, ems_stream)
# await trio.sleep_forever()
async with open_autorecon_ws(
f'wss://stream.binance.com:9443/ws/{listen_key}',
) as ws:
event = await ws.recv_msg()
# https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
if event.get('e') == 'executionReport':
oid: str = event.get('c')
side: str = event.get('S').lower()
status: str = event.get('X')
order_qty: float = float(event.get('q'))
filled_qty: float = float(event.get('z'))
cum_transacted_qty: float = float(event.get('Z'))
price_avg: float = cum_transacted_qty / filled_qty
broker_time: float = float(event.get('T'))
commission_amount: float = float(event.get('n'))
commission_asset: float = event.get('N')
if status == 'TRADE':
if order_qty == filled_qty:
msg = BrokerdFill(
reqid=oid,
time_ns=time.time_ns(),
action=side,
price=price_avg,
broker_details={
'name': 'binance',
'commissions': {
'amount': commission_amount,
'asset': commission_asset
},
'broker_time': broker_time
},
broker_time=broker_time
)
else:
if status == 'NEW':
status = 'submitted'
elif status == 'CANCELED':
status = 'cancelled'
msg = BrokerdStatus(
reqid=oid,
time_ns=time.time_ns(),
status=status,
filled=filled_qty,
remaining=order_qty - filled_qty,
broker_details={'name': 'binance'}
)
else:
# XXX: temporary, to catch unhandled msgs
breakpoint()
await ems_stream.send(msg.dict())
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('binance') as client:
# load all symbols locally for fast search
cache = await client.exch_info()
await ctx.started()
async with ctx.open_stream() as stream:
async for pattern in stream:
# results = await client.exch_info(sym=pattern.upper())
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send({
item[0].symbol: item[0]
for item in matches
})

View File

@ -0,0 +1,188 @@
# piker: trading gear for hackers
# Copyright (C)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Live order control B)
'''
from __future__ import annotations
from typing import (
Any,
AsyncIterator,
)
import time
import tractor
import trio
from piker.brokers._util import (
get_logger,
)
from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
from piker._cacheables import (
open_cached_client,
)
from piker.clearing._messages import (
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdPosition,
BrokerdFill,
BrokerdCancel,
# BrokerdError,
)
log = get_logger('piker.brokers.binance')
async def handle_order_requests(
ems_order_stream: tractor.MsgStream
) -> None:
async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await client.submit_limit(
order.symbol,
order.action,
order.size,
order.price,
oid=order.oid
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(msg.symbol, msg.reqid)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None
) -> AsyncIterator[dict[str, Any]]:
async with open_cached_client('binance') as client:
if not client.api_key:
await ctx.started('paper')
return
# table: PpTable
# ledger: TransactionLedger
# TODO: load pps and accounts using accounting apis!
positions: list[BrokerdPosition] = []
accounts: list[str] = ['binance.default']
await ctx.started((positions, accounts))
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
open_cached_client('binance') as client,
client.manage_listen_key() as listen_key,
):
n.start_soon(handle_order_requests, ems_stream)
# await trio.sleep_forever()
ws: NoBsWs
async with open_autorecon_ws(
f'wss://stream.binance.com:9443/ws/{listen_key}',
) as ws:
event = await ws.recv_msg()
# https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
if event.get('e') == 'executionReport':
oid: str = event.get('c')
side: str = event.get('S').lower()
status: str = event.get('X')
order_qty: float = float(event.get('q'))
filled_qty: float = float(event.get('z'))
cum_transacted_qty: float = float(event.get('Z'))
price_avg: float = cum_transacted_qty / filled_qty
broker_time: float = float(event.get('T'))
commission_amount: float = float(event.get('n'))
commission_asset: float = event.get('N')
if status == 'TRADE':
if order_qty == filled_qty:
msg = BrokerdFill(
reqid=oid,
time_ns=time.time_ns(),
action=side,
price=price_avg,
broker_details={
'name': 'binance',
'commissions': {
'amount': commission_amount,
'asset': commission_asset
},
'broker_time': broker_time
},
broker_time=broker_time
)
else:
if status == 'NEW':
status = 'submitted'
elif status == 'CANCELED':
status = 'cancelled'
msg = BrokerdStatus(
reqid=oid,
time_ns=time.time_ns(),
status=status,
filled=filled_qty,
remaining=order_qty - filled_qty,
broker_details={'name': 'binance'}
)
else:
# XXX: temporary, to catch unhandled msgs
breakpoint()
await ems_stream.send(msg.dict())

View File

@ -0,0 +1,414 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Real-time and historical data feed endpoints.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from datetime import datetime
import itertools
from typing import (
Any,
AsyncGenerator,
Callable,
)
import time
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from piker._cacheables import (
async_lifo_cache,
open_cached_client,
)
from piker.accounting._mktinfo import (
Asset,
MktPair,
digits_to_dec,
)
from piker.data.types import Struct
from piker.data.validate import FeedInit
from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
from piker.brokers._util import (
DataUnavailable,
get_logger,
get_console_log,
)
from .api import (
Client,
Pair,
)
log = get_logger('piker.brokers.binance')
class L1(Struct):
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
update_id: int
sym: str
bid: float
bsize: float
ask: float
asize: float
# validation type
class AggTrade(Struct, frozen=True):
e: str # Event type
E: int # Event time
s: str # Symbol
a: int # Aggregate trade ID
p: float # Price
q: float # Quantity
f: int # First trade ID
l: int # noqa Last trade ID
T: int # Trade time
m: bool # Is the buyer the market maker?
M: bool # Ignore
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here!
msg: dict[str, Any]
async for msg in ws:
match msg:
# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
case {
# NOTE: this is never an old value it seems, so
# they are always sending real L1 spread updates.
'u': upid, # update id
's': sym,
'b': bid,
'B': bsize,
'a': ask,
'A': asize,
}:
# TODO: it would be super nice to have a `L1` piker type
# which "renders" incremental tick updates from a packed
# msg-struct:
# - backend msgs after packed into the type such that we
# can reduce IPC usage but without each backend having
# to do that incremental update logic manually B)
# - would it maybe be more efficient to use this instead?
# https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream
l1 = L1(
update_id=upid,
sym=sym,
bid=bid,
bsize=bsize,
ask=ask,
asize=asize,
)
l1.typecast()
# repack into piker's tick-quote format
yield 'l1', {
'symbol': l1.sym,
'ticks': [
{
'type': 'bid',
'price': l1.bid,
'size': l1.bsize,
},
{
'type': 'bsize',
'price': l1.bid,
'size': l1.bsize,
},
{
'type': 'ask',
'price': l1.ask,
'size': l1.asize,
},
{
'type': 'asize',
'price': l1.ask,
'size': l1.asize,
}
]
}
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
case {
'e': 'aggTrade',
}:
# NOTE: this is purely for a definition,
# ``msgspec.Struct`` does not runtime-validate until you
# decode/encode, see:
# https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg) # TODO: should we .copy() ?
piker_quote: dict = {
'symbol': msg.s,
'last': float(msg.p),
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
yield 'trade', piker_quote
def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
'''
Create a request subscription packet dict.
- spot:
https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
- futes:
https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
'''
return {
'method': 'SUBSCRIBE',
'params': [
f'{pair.lower()}@{sub_name}'
for pair in pairs
],
'id': uid
}
@acm
async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
symbol: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client:
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars(
symbol,
start_dt=start_dt,
end_dt=end_dt,
)
times = array['time']
if (
end_dt is None
):
inow = round(time.time())
if (inow - times[-1]) > 60:
await tractor.breakpoint()
start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair]:
async with open_cached_client('binance') as client:
pair: Pair = await client.exch_info(fqme.upper())
mkt = MktPair(
dst=Asset(
name=pair.baseAsset,
atype='crypto',
tx_tick=digits_to_dec(pair.baseAssetPrecision),
),
src=Asset(
name=pair.quoteAsset,
atype='crypto',
tx_tick=digits_to_dec(pair.quoteAssetPrecision),
),
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
broker='binance',
)
both = mkt, pair
return both
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with (
send_chan as send_chan,
):
init_msgs: list[FeedInit] = []
for sym in symbols:
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
iter_subids = itertools.count()
@acm
async def subscribe(ws: NoBsWs):
# setup subs
subid: int = next(iter_subids)
# trade data (aka L1)
# https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
l1_sub = make_sub(symbols, 'bookTicker', subid)
await ws.send_msg(l1_sub)
# aggregate (each order clear by taker **not** by maker)
# trades data:
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
agg_trades_sub = make_sub(symbols, 'aggTrade', subid)
await ws.send_msg(agg_trades_sub)
# might get ack from ws server, or maybe some
# other msg still in transit..
res = await ws.recv_msg()
subid: str | None = res.get('id')
if subid:
assert res['id'] == subid
yield
subs = []
for sym in symbols:
subs.append("{sym}@aggTrade")
subs.append("{sym}@bookTicker")
# unsub from all pairs on teardown
if ws.connected():
await ws.send_msg({
"method": "UNSUBSCRIBE",
"params": subs,
"id": subid,
})
# XXX: do we need to ack the unsub?
# await ws.recv_msg()
async with (
open_autorecon_ws(
# XXX: see api docs which show diff addr?
# https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
# 'wss://ws-api.binance.com:443/ws-api/v3',
'wss://stream.binance.com/ws',
fixture=subscribe,
) as ws,
# avoid stream-gen closure from breaking trio..
aclosing(stream_messages(ws)) as msg_gen,
):
typ, quote = await anext(msg_gen)
# pull a first quote and deliver
while typ != 'trade':
typ, quote = await anext(msg_gen)
task_status.started((init_msgs, quote))
# signal to caller feed is ready for consumption
feed_is_live.set()
# import time
# last = time.time()
# start streaming
async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('binance') as client:
# load all symbols locally for fast search
cache = await client.exch_info()
await ctx.started()
async with ctx.open_stream() as stream:
async for pattern in stream:
# results = await client.exch_info(sym=pattern.upper())
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send({
item[0].symbol: item[0]
for item in matches
})