piker/piker/brokers/binance/broker.py

187 lines
5.8 KiB
Python
Raw Normal View History

# piker: trading gear for hackers
# Copyright (C)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Live order control B)
'''
from __future__ import annotations
from typing import (
Any,
AsyncIterator,
)
import time
import tractor
import trio
from piker.brokers._util import (
get_logger,
)
from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
Always expand FQMEs with .venue and .expiry values Since there are indeed multiple futures (perp swaps) contracts including a set with expiry, we need a way to distinguish through search and `FutesPair` lookup which contract we're requesting. To solve this extend the `FutesPair` and `SpotPair` to include a `.bs_fqme` field similar to `MktPair` and key the `Client._pairs: ChainMap`'s backing tables with these expanded fqmes. For example the perp swap now expands to `btcusdt.usdtm.perp` which fills in the venue as `'usdtm'` (the usd-margined fututes market) and the expiry as `'perp'` (as before). This allows distinguishing explicitly from, for ex., coin-margined contracts which could instead (since we haven't added the support yet) fqmes of the sort `btcusdt.<coin>m.perp.binance` thus making it explicit and obvious which contract is which B) Further we interpolate the venue token to `spot` for spot markets going forward, which again makes cex spot markets explicit in symbology; we'll need to add this as well to other cex backends ;) Other misc detalles: - change USD-M futes `MarketType` key to `'usdtm_futes'`. - add `Pair.bs_fqme: str` for all pair subtypes with particular special contract handling for futes including quarterlies, perps and the weird "DEFI" ones.. - drop `OHLC.bar_wap` since it's no longer in the default time-series schema and we weren't filling it in here anyway.. - `Client._pairs: ChainMap` is now a read-only fqme-re-keyed view into the underlying pairs tables (which themselves are ideally keyed identically cross-venue) which we populate inside `Client.exch_info()` which itself now does concurrent pairs info fetching via a new `._cache_pairs()` using a `trio` task per API-venue. - support klines history query across all venues using same `Client.mkt_mode_req[Client.mkt_mode]` style as we're doing for `.exch_info()` B) - use the venue specific klines history query limits where documented. - handle new FQME venue / expiry fields inside `get_mkt_info()` ep such that again the correct `Client.mkt_mode` is selected based on parsing the desired spot vs. derivative contract. - do venue-specific-WSS-addr lookup based on output from `get_mkt_info()`; use usdtm venue WSS addr if a `FutesPair` is loaded. - set `topic: str` to the `.bs_fqme` value in live feed quotes! - use `Pair.bs_fqme: str` values for fuzzy-search input set.
2023-06-14 17:16:13 +00:00
from piker.brokers import (
open_cached_client,
)
from piker.clearing._messages import (
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdPosition,
BrokerdFill,
BrokerdCancel,
# BrokerdError,
)
log = get_logger('piker.brokers.binance')
async def handle_order_requests(
ems_order_stream: tractor.MsgStream
) -> None:
async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await client.submit_limit(
order.symbol,
order.action,
order.size,
order.price,
oid=order.oid
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(msg.symbol, msg.reqid)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None
) -> AsyncIterator[dict[str, Any]]:
async with open_cached_client('binance') as client:
if not client.api_key:
await ctx.started('paper')
return
# table: PpTable
# ledger: TransactionLedger
# TODO: load pps and accounts using accounting apis!
positions: list[BrokerdPosition] = []
accounts: list[str] = ['binance.default']
await ctx.started((positions, accounts))
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
open_cached_client('binance') as client,
client.manage_listen_key() as listen_key,
):
n.start_soon(handle_order_requests, ems_stream)
# await trio.sleep_forever()
ws: NoBsWs
async with open_autorecon_ws(
f'wss://stream.binance.com:9443/ws/{listen_key}',
) as ws:
event = await ws.recv_msg()
# https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
if event.get('e') == 'executionReport':
oid: str = event.get('c')
side: str = event.get('S').lower()
status: str = event.get('X')
order_qty: float = float(event.get('q'))
filled_qty: float = float(event.get('z'))
cum_transacted_qty: float = float(event.get('Z'))
price_avg: float = cum_transacted_qty / filled_qty
broker_time: float = float(event.get('T'))
commission_amount: float = float(event.get('n'))
commission_asset: float = event.get('N')
if status == 'TRADE':
if order_qty == filled_qty:
msg = BrokerdFill(
reqid=oid,
time_ns=time.time_ns(),
action=side,
price=price_avg,
broker_details={
'name': 'binance',
'commissions': {
'amount': commission_amount,
'asset': commission_asset
},
'broker_time': broker_time
},
broker_time=broker_time
)
else:
if status == 'NEW':
status = 'submitted'
elif status == 'CANCELED':
status = 'cancelled'
msg = BrokerdStatus(
reqid=oid,
time_ns=time.time_ns(),
status=status,
filled=filled_qty,
remaining=order_qty - filled_qty,
broker_details={'name': 'binance'}
)
else:
# XXX: temporary, to catch unhandled msgs
breakpoint()
await ems_stream.send(msg.dict())