Hard code futes venue(s) for now in `brokerd`..

basic_buy_bot
Tyler Goodlet 2023-06-19 19:20:41 -04:00
parent 676b00592d
commit e4c1003aba
1 changed files with 44 additions and 18 deletions

View File

@ -61,7 +61,11 @@ from piker.clearing._messages import (
Status,
Order,
)
from .venues import Pair
from .venues import (
Pair,
_futes_ws,
_testnet_futes_ws,
)
from .api import Client
log = get_logger('piker.brokers.binance')
@ -213,33 +217,41 @@ async def open_trade_dialog(
) -> AsyncIterator[dict[str, Any]]:
# TODO: how do we set this from the EMS such that
# positions are loaded from the correct venue on the user
# stream at startup? (that is in an attempt to support both
# spot and futes markets?)
# - I guess we just want to instead start 2 separate user
# stream tasks right? unless we want another actor pool?
# XXX: see issue: <urlhere>
venue_name: str = 'futes'
venue_mode: str = 'usdtm_futes'
account_name: str = 'usdtm'
use_testnet: bool = False
async with open_cached_client('binance') as client:
for key, subconf in client.conf.items():
if subconf.get('api_key'):
break
subconf: dict = client.conf[venue_name]
use_testnet = subconf.get('use_testnet', False)
# XXX: if no futes.api_key or spot.api_key has been set we
# always fall back to the paper engine!
else:
if not subconf.get('api_key'):
await ctx.started('paper')
return
async with (
open_cached_client('binance') as client,
):
client.mkt_mode: str = 'usdtm_futes'
client.mkt_mode: str = venue_mode
# if client.
venue: str = client.mkt_mode
# TODO: map these wss urls depending on spot or futes
# setting passed when this task is spawned?
wss_url: str = _futes_ws if not use_testnet else _testnet_futes_ws
wss: NoBsWs
async with (
client.manage_listen_key() as listen_key,
open_autorecon_ws(
f'wss://stream.binancefuture.com/ws/{listen_key}',
# f'wss://stream.binance.com:9443/ws/{listen_key}',
) as wss,
open_autorecon_ws(f'{wss_url}/ws/{listen_key}') as wss,
):
nsid: int = time_ns()
await wss.send_msg({
@ -270,7 +282,7 @@ async def open_trade_dialog(
positions: list[BrokerdPosition] = []
for resp_dict in msg['result']:
resp = resp_dict['res']
resp: dict = resp_dict['res']
req: str = resp_dict['req']
# @account response should be something like:
@ -329,7 +341,9 @@ async def open_trade_dialog(
bs_mktid: str = entry['symbol']
entry_size: float = float(entry['positionAmt'])
pair: Pair | None = client._venue2pairs[venue].get(bs_mktid)
pair: Pair | None = client._venue2pairs[
venue_mode
].get(bs_mktid)
if (
pair
and entry_size > 0
@ -338,7 +352,7 @@ async def open_trade_dialog(
ppmsg = BrokerdPosition(
broker='binance',
account='binance.usdtm',
account=f'binance.{account_name}',
# TODO: maybe we should be passing back
# a `MktPair` here?
@ -357,6 +371,13 @@ async def open_trade_dialog(
await ctx.started((positions, list(accounts)))
# TODO: package more state tracking into the dialogs API?
# - hmm maybe we could include `OrderDialogs.dids:
# bidict` as part of the interface and then ask for
# a reqid field to be passed at init?
# |-> `OrderDialog(reqid_field='orderId')` kinda thing?
# - also maybe bundle in some kind of dialog to account
# table?
dialogs = OrderDialogs()
dids: dict[str, int] = bidict()
@ -404,7 +425,8 @@ async def open_trade_dialog(
)
tn.start_soon(
handle_order_updates,
venue,
venue_mode,
account_name,
client,
ems_stream,
wss,
@ -417,6 +439,7 @@ async def open_trade_dialog(
async def handle_order_updates(
venue: str,
account_name: str,
client: Client,
ems_stream: tractor.MsgStream,
wss: NoBsWs,
@ -574,6 +597,9 @@ async def handle_order_updates(
time_ns=time_ns(),
# reqid=reqid,
reqid=oid,
# TODO: i feel like we don't need to make the
# ems and upstream clients aware of this?
# account='binance.usdtm',
status=status,
@ -622,7 +648,7 @@ async def handle_order_updates(
pair: Pair | None = client._venue2pairs[venue].get(bs_mktid)
ppmsg = BrokerdPosition(
broker='binance',
account='binance.usdtm',
account=f'binance.{account_name}',
# TODO: maybe we should be passing back
# a `MktPair` here?