piker/examples/basic_order_bot.py

264 lines
7.5 KiB
Python
Raw Permalink Normal View History

# from pprint import pformat
from functools import partial
from decimal import Decimal
from typing import Callable
import tractor
import trio
from uuid import uuid4
from piker.service import maybe_open_pikerd
from piker.accounting import dec_digits
from piker.clearing import (
open_ems,
OrderClient,
)
# TODO: we should probably expose these top level in this subsys?
from piker.clearing._messages import (
Order,
Status,
BrokerdPosition,
)
from piker.data import (
iterticks,
Flume,
open_feed,
Feed,
# ShmArray,
)
# TODO: handle other statuses:
# - fills, errors, and position tracking
async def wait_for_order_status(
trades_stream: tractor.MsgStream,
oid: str,
expect_status: str,
) -> tuple[
list[Status],
list[BrokerdPosition],
]:
'''
Wait for a specific order status for a given dialog, return msg flow
up to that msg and any position update msgs in a tuple.
'''
# Wait for position message before moving on to verify flow(s)
# for the multi-order position entry/exit.
status_msgs: list[Status] = []
pp_msgs: list[BrokerdPosition] = []
async for msg in trades_stream:
match msg:
case {'name': 'position'}:
ppmsg = BrokerdPosition(**msg)
pp_msgs.append(ppmsg)
case {
'name': 'status',
}:
msg = Status(**msg)
status_msgs.append(msg)
# if we get the status we expect then return all
# collected msgs from the brokerd dialog up to the
# exected msg B)
if (
msg.resp == expect_status
and msg.oid == oid
):
return status_msgs, pp_msgs
async def bot_main():
'''
Boot the piker runtime, open an ems connection, submit
and process orders statuses in real-time.
'''
ll: str = 'info'
# open an order ctl client, live data feed, trio nursery for
# spawning an order trailer task
client: OrderClient
trades_stream: tractor.MsgStream
feed: Feed
accounts: list[str]
fqme: str = 'btcusdt.usdtm.perp.binance'
async with (
# TODO: do this implicitly inside `open_ems()` ep below?
# init and sync actor-service runtime
maybe_open_pikerd(
loglevel=ll,
debug_mode=True,
),
open_ems(
fqme,
mode='paper', # {'live', 'paper'}
# mode='live', # for real-brokerd submissions
loglevel=ll,
) as (
client, # OrderClient
trades_stream, # tractor.MsgStream startup_pps,
_, # positions
accounts,
_, # dialogs
),
open_feed(
fqmes=[fqme],
loglevel=ll,
# TODO: if you want to throttle via downsampling
# how many tick updates your feed received on
# quote streams B)
# tick_throttle=10,
) as feed,
trio.open_nursery() as tn,
):
assert accounts
print(f'Loaded binance accounts: {accounts}')
flume: Flume = feed.flumes[fqme]
min_tick = Decimal(flume.mkt.price_tick)
min_tick_digits: int = dec_digits(min_tick)
price_round: Callable = partial(
round,
ndigits=min_tick_digits,
)
quote_stream: trio.abc.ReceiveChannel = feed.streams['binance']
# always keep live limit 0.003% below last
# clearing price
clear_margin: float = 0.9997
async def trailer(
order: Order,
):
# ref shm OHLCV array history, if you want
# s_shm: ShmArray = flume.rt_shm
# m_shm: ShmArray = flume.hist_shm
# NOTE: if you wanted to frame ticks by type like the
# the quote throttler does.. and this is probably
# faster in terms of getting the latest tick type
# embedded value of interest?
# from piker.data._sampling import frame_ticks
async for quotes in quote_stream:
for fqme, quote in quotes.items():
# print(
# f'{quote["symbol"]} -> {quote["ticks"]}\n'
# f'last 1s OHLC:\n{s_shm.array[-1]}\n'
# f'last 1m OHLC:\n{m_shm.array[-1]}\n'
# )
for tick in iterticks(
quote,
reverse=True,
# types=('trade', 'dark_trade'), # defaults
):
await client.update(
uuid=order.oid,
price=price_round(
clear_margin
*
tick['price']
),
)
msgs, pps = await wait_for_order_status(
trades_stream,
order.oid,
'open'
)
# if multiple clears per quote just
# skip to the next quote?
break
# get first live quote to be sure we submit the initial
# live buy limit low enough that it doesn't clear due to
# a stale initial price from the data feed layer!
first_ask_price: float | None = None
async for quotes in quote_stream:
for fqme, quote in quotes.items():
# print(quote['symbol'])
for tick in iterticks(quote, types=('ask')):
first_ask_price: float = tick['price']
break
if first_ask_price:
break
# setup order dialog via first msg
price: float = price_round(
clear_margin
*
first_ask_price,
)
# compute a 1k USD sized pos
size: float = round(1e3/price, ndigits=3)
order = Order(
# docs on how this all works, bc even i'm not entirely
# clear XD. also we probably want to figure out how to
# offer both the paper engine running and the brokerd
# order ctl tasks with the ems choosing which stream to
# route msgs on given the account value!
account='paper', # use built-in paper clearing engine and .accounting
# account='binance.usdtm', # for live binance futes
oid=str(uuid4()),
exec_mode='live', # {'dark', 'live', 'alert'}
action='buy', # TODO: remove this from our schema?
size=size,
symbol=fqme,
price=price,
brokers=['binance'],
)
await client.send(order)
msgs, pps = await wait_for_order_status(
trades_stream,
order.oid,
'open',
)
assert not pps
assert msgs[-1].oid == order.oid
# start "trailer task" which tracks rt quote stream
tn.start_soon(trailer, order)
try:
# wait for ctl-c from user..
await trio.sleep_forever()
except KeyboardInterrupt:
# cancel the open order
await client.cancel(order.oid)
msgs, pps = await wait_for_order_status(
trades_stream,
order.oid,
'canceled'
)
raise
if __name__ == '__main__':
trio.run(bot_main)