2023-06-26 23:31:19 +00:00
|
|
|
# from pprint import pformat
|
2023-06-26 19:22:51 +00:00
|
|
|
from functools import partial
|
|
|
|
from decimal import Decimal
|
|
|
|
from typing import Callable
|
|
|
|
|
2023-06-26 17:43:59 +00:00
|
|
|
import tractor
|
|
|
|
import trio
|
|
|
|
from uuid import uuid4
|
|
|
|
|
2023-06-26 23:31:19 +00:00
|
|
|
from piker.service import maybe_open_pikerd
|
2023-06-26 19:22:51 +00:00
|
|
|
from piker.accounting import dec_digits
|
2023-06-26 17:43:59 +00:00
|
|
|
from piker.clearing import (
|
|
|
|
open_ems,
|
|
|
|
OrderClient,
|
|
|
|
)
|
|
|
|
# TODO: we should probably expose these top level in this subsys?
|
|
|
|
from piker.clearing._messages import (
|
|
|
|
Order,
|
|
|
|
Status,
|
|
|
|
BrokerdPosition,
|
|
|
|
)
|
2023-06-26 19:22:51 +00:00
|
|
|
from piker.data import (
|
2023-06-26 23:31:19 +00:00
|
|
|
iterticks,
|
2023-06-26 19:22:51 +00:00
|
|
|
Flume,
|
|
|
|
open_feed,
|
|
|
|
Feed,
|
2023-06-26 20:26:33 +00:00
|
|
|
# ShmArray,
|
2023-06-26 19:22:51 +00:00
|
|
|
)
|
2023-06-26 17:43:59 +00:00
|
|
|
|
|
|
|
|
2023-06-26 20:26:33 +00:00
|
|
|
# TODO: handle other statuses:
|
2023-06-27 17:22:54 +00:00
|
|
|
# - fills, errors, and position tracking
|
2023-06-26 17:43:59 +00:00
|
|
|
async def wait_for_order_status(
|
|
|
|
trades_stream: tractor.MsgStream,
|
|
|
|
oid: str,
|
|
|
|
expect_status: str,
|
|
|
|
|
|
|
|
) -> tuple[
|
|
|
|
list[Status],
|
|
|
|
list[BrokerdPosition],
|
|
|
|
]:
|
|
|
|
'''
|
|
|
|
Wait for a specific order status for a given dialog, return msg flow
|
|
|
|
up to that msg and any position update msgs in a tuple.
|
|
|
|
|
|
|
|
'''
|
|
|
|
# Wait for position message before moving on to verify flow(s)
|
|
|
|
# for the multi-order position entry/exit.
|
|
|
|
status_msgs: list[Status] = []
|
|
|
|
pp_msgs: list[BrokerdPosition] = []
|
|
|
|
|
|
|
|
async for msg in trades_stream:
|
|
|
|
match msg:
|
|
|
|
case {'name': 'position'}:
|
|
|
|
ppmsg = BrokerdPosition(**msg)
|
|
|
|
pp_msgs.append(ppmsg)
|
|
|
|
|
|
|
|
case {
|
|
|
|
'name': 'status',
|
|
|
|
}:
|
|
|
|
msg = Status(**msg)
|
|
|
|
status_msgs.append(msg)
|
|
|
|
|
|
|
|
# if we get the status we expect then return all
|
|
|
|
# collected msgs from the brokerd dialog up to the
|
|
|
|
# exected msg B)
|
|
|
|
if (
|
|
|
|
msg.resp == expect_status
|
|
|
|
and msg.oid == oid
|
|
|
|
):
|
|
|
|
return status_msgs, pp_msgs
|
|
|
|
|
|
|
|
|
|
|
|
async def bot_main():
|
|
|
|
'''
|
|
|
|
Boot the piker runtime, open an ems connection, submit
|
|
|
|
and process orders statuses in real-time.
|
|
|
|
|
|
|
|
'''
|
|
|
|
ll: str = 'info'
|
|
|
|
|
2023-06-26 19:22:51 +00:00
|
|
|
# open an order ctl client, live data feed, trio nursery for
|
|
|
|
# spawning an order trailer task
|
2023-06-26 17:43:59 +00:00
|
|
|
client: OrderClient
|
|
|
|
trades_stream: tractor.MsgStream
|
2023-06-26 19:22:51 +00:00
|
|
|
feed: Feed
|
2023-06-26 17:43:59 +00:00
|
|
|
accounts: list[str]
|
|
|
|
|
|
|
|
fqme: str = 'btcusdt.usdtm.perp.binance'
|
|
|
|
|
|
|
|
async with (
|
|
|
|
|
2023-06-26 19:22:51 +00:00
|
|
|
# TODO: do this implicitly inside `open_ems()` ep below?
|
2023-06-26 17:43:59 +00:00
|
|
|
# init and sync actor-service runtime
|
|
|
|
maybe_open_pikerd(
|
|
|
|
loglevel=ll,
|
2023-06-26 19:22:51 +00:00
|
|
|
debug_mode=True,
|
2023-06-26 17:43:59 +00:00
|
|
|
|
|
|
|
),
|
|
|
|
open_ems(
|
|
|
|
fqme,
|
|
|
|
mode='paper', # {'live', 'paper'}
|
2023-06-26 18:00:01 +00:00
|
|
|
# mode='live', # for real-brokerd submissions
|
2023-06-26 17:43:59 +00:00
|
|
|
loglevel=ll,
|
|
|
|
|
|
|
|
) as (
|
|
|
|
client, # OrderClient
|
|
|
|
trades_stream, # tractor.MsgStream startup_pps,
|
|
|
|
_, # positions
|
|
|
|
accounts,
|
|
|
|
_, # dialogs
|
2023-06-26 19:22:51 +00:00
|
|
|
),
|
|
|
|
|
|
|
|
open_feed(
|
|
|
|
fqmes=[fqme],
|
|
|
|
loglevel=ll,
|
|
|
|
|
|
|
|
# TODO: if you want to throttle via downsampling
|
|
|
|
# how many tick updates your feed received on
|
|
|
|
# quote streams B)
|
|
|
|
# tick_throttle=10,
|
|
|
|
) as feed,
|
|
|
|
|
|
|
|
trio.open_nursery() as tn,
|
2023-06-26 17:43:59 +00:00
|
|
|
):
|
2023-06-26 23:31:19 +00:00
|
|
|
assert accounts
|
2023-06-26 17:43:59 +00:00
|
|
|
print(f'Loaded binance accounts: {accounts}')
|
|
|
|
|
2023-06-26 19:22:51 +00:00
|
|
|
flume: Flume = feed.flumes[fqme]
|
|
|
|
min_tick = Decimal(flume.mkt.price_tick)
|
|
|
|
min_tick_digits: int = dec_digits(min_tick)
|
|
|
|
price_round: Callable = partial(
|
|
|
|
round,
|
|
|
|
ndigits=min_tick_digits,
|
|
|
|
)
|
|
|
|
|
|
|
|
quote_stream: trio.abc.ReceiveChannel = feed.streams['binance']
|
|
|
|
|
|
|
|
|
2023-06-26 23:31:19 +00:00
|
|
|
# always keep live limit 0.003% below last
|
|
|
|
# clearing price
|
|
|
|
clear_margin: float = 0.9997
|
2023-06-26 19:22:51 +00:00
|
|
|
|
|
|
|
async def trailer(
|
|
|
|
order: Order,
|
|
|
|
):
|
2023-06-26 20:26:33 +00:00
|
|
|
# ref shm OHLCV array history, if you want
|
|
|
|
# s_shm: ShmArray = flume.rt_shm
|
|
|
|
# m_shm: ShmArray = flume.hist_shm
|
2023-06-26 19:22:51 +00:00
|
|
|
|
|
|
|
# NOTE: if you wanted to frame ticks by type like the
|
2023-06-26 23:31:19 +00:00
|
|
|
# the quote throttler does.. and this is probably
|
|
|
|
# faster in terms of getting the latest tick type
|
|
|
|
# embedded value of interest?
|
2023-06-26 19:22:51 +00:00
|
|
|
# from piker.data._sampling import frame_ticks
|
|
|
|
|
|
|
|
async for quotes in quote_stream:
|
|
|
|
for fqme, quote in quotes.items():
|
2023-06-27 17:22:54 +00:00
|
|
|
# print(
|
|
|
|
# f'{quote["symbol"]} -> {quote["ticks"]}\n'
|
|
|
|
# f'last 1s OHLC:\n{s_shm.array[-1]}\n'
|
|
|
|
# f'last 1m OHLC:\n{m_shm.array[-1]}\n'
|
|
|
|
# )
|
|
|
|
|
2023-06-26 23:31:19 +00:00
|
|
|
for tick in iterticks(
|
|
|
|
quote,
|
2023-06-27 17:22:54 +00:00
|
|
|
reverse=True,
|
|
|
|
# types=('trade', 'dark_trade'), # defaults
|
2023-06-26 20:26:33 +00:00
|
|
|
):
|
2023-06-26 19:22:51 +00:00
|
|
|
|
2023-06-26 23:31:19 +00:00
|
|
|
await client.update(
|
|
|
|
uuid=order.oid,
|
|
|
|
price=price_round(
|
|
|
|
clear_margin
|
|
|
|
*
|
|
|
|
tick['price']
|
|
|
|
),
|
|
|
|
)
|
|
|
|
msgs, pps = await wait_for_order_status(
|
|
|
|
trades_stream,
|
|
|
|
order.oid,
|
|
|
|
'open'
|
|
|
|
)
|
|
|
|
# if multiple clears per quote just
|
|
|
|
# skip to the next quote?
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
# get first live quote to be sure we submit the initial
|
|
|
|
# live buy limit low enough that it doesn't clear due to
|
|
|
|
# a stale initial price from the data feed layer!
|
|
|
|
first_ask_price: float | None = None
|
|
|
|
async for quotes in quote_stream:
|
|
|
|
for fqme, quote in quotes.items():
|
|
|
|
# print(quote['symbol'])
|
|
|
|
for tick in iterticks(quote, types=('ask')):
|
|
|
|
first_ask_price: float = tick['price']
|
|
|
|
break
|
|
|
|
|
|
|
|
if first_ask_price:
|
|
|
|
break
|
2023-06-26 19:22:51 +00:00
|
|
|
|
|
|
|
# setup order dialog via first msg
|
|
|
|
price: float = price_round(
|
|
|
|
clear_margin
|
|
|
|
*
|
2023-06-26 23:31:19 +00:00
|
|
|
first_ask_price,
|
2023-06-26 19:22:51 +00:00
|
|
|
)
|
2023-06-26 23:31:19 +00:00
|
|
|
|
|
|
|
# compute a 1k USD sized pos
|
|
|
|
size: float = round(1e3/price, ndigits=3)
|
|
|
|
|
2023-06-26 17:43:59 +00:00
|
|
|
order = Order(
|
2023-06-26 23:31:19 +00:00
|
|
|
|
|
|
|
# docs on how this all works, bc even i'm not entirely
|
|
|
|
# clear XD. also we probably want to figure out how to
|
|
|
|
# offer both the paper engine running and the brokerd
|
|
|
|
# order ctl tasks with the ems choosing which stream to
|
|
|
|
# route msgs on given the account value!
|
|
|
|
account='paper', # use built-in paper clearing engine and .accounting
|
|
|
|
# account='binance.usdtm', # for live binance futes
|
|
|
|
|
|
|
|
oid=str(uuid4()),
|
2023-06-26 17:43:59 +00:00
|
|
|
exec_mode='live', # {'dark', 'live', 'alert'}
|
2023-06-26 23:31:19 +00:00
|
|
|
|
2023-06-26 17:43:59 +00:00
|
|
|
action='buy', # TODO: remove this from our schema?
|
2023-06-26 23:31:19 +00:00
|
|
|
|
2023-06-26 17:43:59 +00:00
|
|
|
size=size,
|
|
|
|
symbol=fqme,
|
|
|
|
price=price,
|
|
|
|
brokers=['binance'],
|
|
|
|
)
|
|
|
|
await client.send(order)
|
|
|
|
|
|
|
|
msgs, pps = await wait_for_order_status(
|
|
|
|
trades_stream,
|
2023-06-26 23:31:19 +00:00
|
|
|
order.oid,
|
|
|
|
'open',
|
2023-06-26 17:43:59 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
assert not pps
|
2023-06-26 23:31:19 +00:00
|
|
|
assert msgs[-1].oid == order.oid
|
2023-06-26 17:43:59 +00:00
|
|
|
|
2023-06-26 19:22:51 +00:00
|
|
|
# start "trailer task" which tracks rt quote stream
|
|
|
|
tn.start_soon(trailer, order)
|
|
|
|
|
2023-06-26 17:43:59 +00:00
|
|
|
try:
|
|
|
|
# wait for ctl-c from user..
|
|
|
|
await trio.sleep_forever()
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
# cancel the open order
|
2023-06-26 23:31:19 +00:00
|
|
|
await client.cancel(order.oid)
|
|
|
|
|
2023-06-26 17:43:59 +00:00
|
|
|
msgs, pps = await wait_for_order_status(
|
|
|
|
trades_stream,
|
2023-06-26 23:31:19 +00:00
|
|
|
order.oid,
|
2023-06-26 17:43:59 +00:00
|
|
|
'canceled'
|
|
|
|
)
|
2023-06-26 19:22:51 +00:00
|
|
|
raise
|
2023-06-26 17:43:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
trio.run(bot_main)
|