Merge pull request #223 from pikers/account_select

`brokerd`, `emsd` and UI multi-account per broker, order mode support
fsp_feeds
goodboy 2021-09-12 17:32:32 -04:00 committed by GitHub
commit cecba8904d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1131 additions and 753 deletions

View File

@ -14,12 +14,14 @@ private_key = ""
[ib]
host = "127.0.0.1"
[ib.accounts]
margin = ""
registered = ""
paper = ""
ports.gw = 4002
ports.tws = 7497
ports.order = ["gw", "tws",]
[ib.ports]
gw = 4002
tws = 7497
order = [ "gw", "tws",]
accounts.margin = "X0000000"
accounts.ira = "X0000000"
accounts.paper = "XX0000000"
# the order in which accounts will be selected (if found through
# `brokerd`) when a new symbol is loaded
accounts_order = ['paper', 'margin', 'ira']

View File

@ -53,7 +53,7 @@ from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy
import numpy as np
from . import config
from .. import config
from ..log import get_logger, get_console_log
from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df
@ -62,8 +62,7 @@ from ._util import SymbolNotFound, NoData
from ..clearing._messages import (
BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdPosition, BrokerdCancel,
BrokerdFill,
# BrokerdError,
BrokerdFill, BrokerdError,
)
@ -220,15 +219,18 @@ class Client:
Note: this client requires running inside an ``asyncio`` loop.
"""
_contracts: dict[str, Contract] = {}
def __init__(
self,
ib: ibis.IB,
) -> None:
self.ib = ib
self.ib.RaiseRequestErrors = True
# contract cache
self._contracts: dict[str, Contract] = {}
self._feeds: dict[str, trio.abc.SendChannel] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default
@ -504,7 +506,7 @@ class Client:
return contract, ticker, details
# async to be consistent for the client proxy, and cuz why not.
async def submit_limit(
def submit_limit(
self,
# ignored since ib doesn't support defining your
# own order id
@ -513,7 +515,7 @@ class Client:
price: float,
action: str,
size: int,
account: str = '', # if blank the "default" tws account is used
account: str, # if blank the "default" tws account is used
# XXX: by default 0 tells ``ib_insync`` methods that there is no
# existing order so ask the client to create a new one (which it
@ -536,6 +538,7 @@ class Client:
Order(
orderId=reqid or 0, # stupid api devs..
action=action.upper(), # BUY/SELL
# lookup the literal account number by name here.
account=account,
orderType='LMT',
lmtPrice=price,
@ -552,7 +555,7 @@ class Client:
# their own weird client int counting ids..
return trade.order.orderId
async def submit_cancel(
def submit_cancel(
self,
reqid: str,
) -> None:
@ -569,6 +572,7 @@ class Client:
async def recv_trade_updates(
self,
to_trio: trio.abc.SendChannel,
) -> None:
"""Stream a ticker using the std L1 api.
"""
@ -659,9 +663,10 @@ class Client:
self.ib.errorEvent.connect(push_err)
async def positions(
def positions(
self,
account: str = '',
) -> list[Position]:
"""
Retrieve position info for ``account``.
@ -695,8 +700,11 @@ def get_config() -> dict[str, Any]:
return section
_accounts2clients: dict[str, Client] = {}
@asynccontextmanager
async def _aio_get_client(
async def load_aio_clients(
host: str = '127.0.0.1',
port: int = None,
@ -710,33 +718,12 @@ async def _aio_get_client(
TODO: consider doing this with a ctx mngr eventually?
'''
global _accounts2clients
global _client_cache
conf = get_config()
# first check cache for existing client
try:
if port:
client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
if not client.ib.isConnected():
# we have a stale client to re-allocate
raise KeyError
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
if client_id is None:
# if this is a persistent brokerd, try to allocate a new id for
# each client
client_id = next(_client_ids)
ib = NonShittyIB()
ib = None
client = None
# attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection.
@ -753,37 +740,87 @@ async def _aio_get_client(
)
order = ports['order']
accounts_def = config.load_accounts(['ib'])
try_ports = [ports[key] for key in order]
ports = try_ports if port is None else [port]
we_connected = []
# allocate new and/or reload disconnected but cached clients
try:
# TODO: support multiple clients allowing for execution on
# multiple accounts (including a paper instance running on the
# same machine) and switching between accounts in the EMs
_err = None
# (re)load any and all clients that can be found
# from connection details in ``brokers.toml``.
for port in ports:
client = _client_cache.get((host, port))
accounts_found: dict[str, Client] = {}
if not client or not client.ib.isConnected():
try:
ib = NonShittyIB()
# if this is a persistent brokerd, try to allocate
# a new id for each client
client_id = next(_client_ids)
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
break
# create and cache client
client = Client(ib)
# Pre-collect all accounts available for this
# connection and map account names to this client
# instance.
pps = ib.positions()
if pps:
for pp in pps:
accounts_found[
accounts_def.inverse[pp.account]
] = client
# if there are no positions or accounts
# without positions we should still register
# them for this client
for value in ib.accountValues():
acct = value.account
if acct not in accounts_found:
accounts_found[
accounts_def.inverse[acct]
] = client
log.info(
f'Loaded accounts for client @ {host}:{port}\n'
f'{pformat(accounts_found)}'
)
# update all actor-global caches
log.info(f"Caching client for {(host, port)}")
_client_cache[(host, port)] = client
we_connected.append(client)
_accounts2clients.update(accounts_found)
except ConnectionRefusedError as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
else:
if not _client_cache:
raise ConnectionRefusedError(_err)
# create and cache
try:
client = Client(ib)
# retreive first loaded client
clients = list(_client_cache.values())
if clients:
client = clients[0]
_client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client
yield client, _client_cache, _accounts2clients
except BaseException:
ib.disconnect()
for client in we_connected:
client.ib.disconnect()
raise
@ -791,10 +828,16 @@ async def _aio_run_client_method(
meth: str,
to_trio=None,
from_trio=None,
client=None,
**kwargs,
) -> None:
async with _aio_get_client() as client:
async with load_aio_clients() as (
_client,
clients,
accts2clients,
):
client = client or _client
async_meth = getattr(client, meth)
# handle streaming methods
@ -808,7 +851,9 @@ async def _aio_run_client_method(
async def _trio_run_client_method(
method: str,
client: Optional[Client] = None,
**kwargs,
) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
@ -828,12 +873,12 @@ async def _trio_run_client_method(
):
kwargs['_treat_as_stream'] = True
result = await tractor.to_asyncio.run_task(
return await tractor.to_asyncio.run_task(
_aio_run_client_method,
meth=method,
client=client,
**kwargs
)
return result
class _MethodProxy:
@ -1081,8 +1126,11 @@ async def _setup_quote_stream(
"""
global _quote_streams
async with _aio_get_client() as client:
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -1277,8 +1325,6 @@ async def stream_quotes(
calc_price=calc_price
)
# con = quote['contract']
# topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic
await send_chan.send({topic: quote})
@ -1295,12 +1341,21 @@ def pack_position(pos: Position) -> dict[str, Any]:
symbol = con.localSymbol.replace(' ', '')
else:
symbol = con.symbol
symbol = con.symbol.lower()
exch = (con.primaryExchange or con.exchange).lower()
symkey = '.'.join((symbol, exch))
if not exch:
# attempt to lookup the symbol from our
# hacked set..
for sym in _adhoc_futes_set:
if symbol in sym:
symkey = sym
break
# TODO: options contracts into a sane format..
symkey = '.'.join([
symbol.lower(),
(con.primaryExchange or con.exchange).lower(),
])
return BrokerdPosition(
broker='ib',
account=pos.account,
@ -1314,28 +1369,57 @@ def pack_position(pos: Position) -> dict[str, Any]:
async def handle_order_requests(
ems_order_stream: tractor.MsgStream,
accounts_def: dict[str, str],
) -> None:
global _accounts2clients
# request_msg: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
account = request_msg['account']
acct_number = accounts_def.get(account)
if not acct_number:
log.error(
f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
continue
client = _accounts2clients.get(account)
if not client:
log.error(
f'An IB client for account name {account} is not found.\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
continue
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await _trio_run_client_method(
method='submit_limit',
reqid = client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
account=acct_number,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
@ -1352,16 +1436,13 @@ async def handle_order_requests(
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await _trio_run_client_method(
method='submit_cancel',
reqid=msg.reqid
)
client.submit_cancel(reqid=msg.reqid)
else:
log.error(f'Unknown order command: {request_msg}')
@ -1378,35 +1459,71 @@ async def trades_dialogue(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
ib_trade_events_stream = await _trio_run_client_method(
method='recv_trade_updates',
)
accounts_def = config.load_accounts(['ib'])
global _accounts2clients
global _client_cache
# deliver positions to subscriber before anything else
positions = await _trio_run_client_method(method='positions')
all_positions = {}
for pos in positions:
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
for account, client in _accounts2clients.items():
# each client to an api endpoint will have it's own event stream
trade_event_stream = await _trio_run_client_method(
method='recv_trade_updates',
client=client,
)
clients.append((client, trade_event_stream))
for client in _client_cache.values():
for pos in client.positions():
msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
all_positions.setdefault(
msg.symbol, []
).append(msg.dict())
await ctx.started(all_positions)
action_map = {'BOT': 'buy', 'SLD': 'sell'}
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream)
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def
)
# block until cancelled
await trio.sleep_forever()
async def deliver_trade_events(
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str],
) -> None:
'''Format and relay all trade events for a given client to the EMS.
'''
action_map = {'BOT': 'buy', 'SLD': 'sell'}
# TODO: for some reason we can receive a ``None`` here when the
# ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it...
async for event_name, item in ib_trade_events_stream:
print(f' ib sending {item}')
async for event_name, item in trade_event_stream:
log.info(f'ib sending {event_name}:\n{pformat(item)}')
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
@ -1446,6 +1563,7 @@ async def trades_dialogue(
reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
account=accounts_def.inverse[trade.order.account],
# everyone doin camel case..
status=status.status.lower(), # force lower case
@ -1513,7 +1631,8 @@ async def trades_dialogue(
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# don't forward for now, it's unecessary.. but if we wanted to,
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
continue

View File

@ -43,7 +43,7 @@ import asks
from ..calc import humanize, percent_change
from .._cacheables import open_cached_client, async_lifo_cache
from . import config
from .. import config
from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json, get_console_log
from . import get_brokermod

View File

@ -0,0 +1,328 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Position allocation logic and protocols.
'''
from enum import Enum
from typing import Optional
from bidict import bidict
from pydantic import BaseModel, validator
from ..data._source import Symbol
from ._messages import BrokerdPosition, Status
class Position(BaseModel):
'''Basic pp (personal position) model with attached fills history.
This type should be IPC wire ready?
'''
symbol: Symbol
# last size and avg entry price
size: float
avg_price: float # TODO: contextual pricing
# ordered record of known constituent trade messages
fills: list[Status] = []
def update_from_msg(
self,
msg: BrokerdPosition,
) -> None:
# XXX: better place to do this?
symbol = self.symbol
lot_size_digits = symbol.lot_size_digits
avg_price, size = (
round(msg['avg_price'], ndigits=symbol.tick_size_digits),
round(msg['size'], ndigits=lot_size_digits),
)
self.avg_price = avg_price
self.size = size
_size_units = bidict({
'currency': '$ size',
'units': '# units',
# TODO: but we'll need a `<brokermod>.get_accounts()` or something
# 'percent_of_port': '% of port',
})
SizeUnit = Enum(
'SizeUnit',
_size_units,
)
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol
accounts: bidict[str, Optional[str]]
account: Optional[str] = 'paper'
@validator('account', pre=False)
def set_account(cls, v, values):
if v:
return values['accounts'][v]
size_unit: SizeUnit = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit')
def lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
return v.name
# TODO: if we ever want ot support non-uniform entry-slot-proportion
# "sizes"
# disti_weight: str = 'uniform'
units_limit: float
currency_limit: float
slots: int
def step_sizes(
self,
) -> (float, float):
'''Return the units size for each unit type as a tuple.
'''
slots = self.slots
return (
self.units_limit / slots,
self.currency_limit / slots,
)
def limit(self) -> float:
if self.size_unit == 'currency':
return self.currency_limit
else:
return self.units_limit
def account_name(self) -> str:
return self.accounts.inverse[self.account]
def next_order_info(
self,
# we only need a startup size for exit calcs, we can the
# determine how large slots should be if the initial pp size was
# larger then the current live one, and the live one is smaller
# then the initial config settings.
startup_pp: Position,
live_pp: Position,
price: float,
action: str,
) -> dict:
'''Generate order request info for the "next" submittable order
depending on position / order entry config.
'''
sym = self.symbol
ld = sym.lot_size_digits
size_unit = self.size_unit
live_size = live_pp.size
abs_live_size = abs(live_size)
abs_startup_size = abs(startup_pp.size)
u_per_slot, currency_per_slot = self.step_sizes()
if size_unit == 'units':
slot_size = u_per_slot
l_sub_pp = self.units_limit - abs_live_size
elif size_unit == 'currency':
live_cost_basis = abs_live_size * live_pp.avg_price
slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price
# an entry (adding-to or starting a pp)
if (
action == 'buy' and live_size > 0 or
action == 'sell' and live_size < 0 or
live_size == 0
):
order_size = min(slot_size, l_sub_pp)
# an exit (removing-from or going to net-zero pp)
else:
# when exiting a pp we always try to slot the position
# in the instrument's units, since doing so in a derived
# size measure (eg. currency value, percent of port) would
# result in a mis-mapping of slots sizes in unit terms
# (i.e. it would take *more* slots to exit at a profit and
# *less* slots to exit at a loss).
pp_size = max(abs_startup_size, abs_live_size)
slotted_pp = pp_size / self.slots
if size_unit == 'currency':
# compute the "projected" limit's worth of units at the
# current pp (weighted) price:
slot_size = currency_per_slot / live_pp.avg_price
else:
slot_size = u_per_slot
# TODO: ensure that the limit can never be set **lower**
# then the current pp size? It should be configured
# correctly at startup right?
# if our position is greater then our limit setting
# we'll want to use slot sizes which are larger then what
# the limit would normally determine.
order_size = max(slotted_pp, slot_size)
if (
abs_live_size < slot_size or
# NOTE: front/back "loading" heurstic:
# if the remaining pp is in between 0-1.5x a slot's
# worth, dump the whole position in this last exit
# therefore conducting so called "back loading" but
# **without** going past a net-zero pp. if the pp is
# > 1.5x a slot size, then front load: exit a slot's and
# expect net-zero to be acquired on the final exit.
slot_size < pp_size < round((1.5*slot_size), ndigits=ld)
):
order_size = abs_live_size
slots_used = 1.0 # the default uniform policy
if order_size < slot_size:
# compute a fractional slots size to display
slots_used = self.slots_used(
Position(symbol=sym, size=order_size, avg_price=price)
)
return {
'size': abs(round(order_size, ndigits=ld)),
'size_digits': ld,
# TODO: incorporate multipliers for relevant derivatives
'fiat_size': round(order_size * price, ndigits=2),
'slots_used': slots_used,
# update line LHS label with account name
'account': self.account_name(),
}
def slots_used(
self,
pp: Position,
) -> float:
'''Calc and return the number of slots used by this ``Position``.
'''
abs_pp_size = abs(pp.size)
if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.avg_price)
live_currency_size = abs_pp_size * pp.avg_price
prop = live_currency_size / self.currency_limit
else:
# return (size or abs_pp_size) / alloc.units_limit
prop = abs_pp_size / self.units_limit
# TODO: REALLY need a way to show partial slots..
# for now we round at the midway point between slots
return round(prop * self.slots)
def mk_allocator(
symbol: Symbol,
accounts: dict[str, str],
startup_pp: Position,
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
'size_unit': _size_units['currency'],
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
},
**kwargs,
) -> Allocator:
if kwargs:
defaults.update(kwargs)
# load and retreive user settings for default allocations
# ``config.toml``
user_def = {
'currency_limit': 5e3,
'slots': 4,
}
defaults.update(user_def)
alloc = Allocator(
symbol=symbol,
accounts=accounts,
**defaults,
)
asset_type = symbol.type_key
# specific configs by asset class / type
if asset_type in ('future', 'option', 'futures_option'):
# since it's harder to know how currency "applies" in this case
# given leverage properties
alloc.size_unit = '# units'
# set units limit to slots size thus making make the next
# entry step 1.0
alloc.units_limit = alloc.slots
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.avg_price
if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2)
else:
startup_size = startup_pp.size
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
return alloc

View File

@ -201,6 +201,7 @@ async def clear_dark_triggers(
msg = BrokerdOrder(
action=cmd['action'],
oid=oid,
account=cmd['account'],
time_ns=time.time_ns(),
# this **creates** new order request for the
@ -259,8 +260,15 @@ async def clear_dark_triggers(
@dataclass
class TradesRelay:
# for now we keep only a single connection open with
# each ``brokerd`` for simplicity.
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
# map of symbols to dicts of accounts to pp msgs
positions: dict[str, dict[str, BrokerdPosition]]
# count of connected ems clients for this ``brokerd``
consumers: int = 0
@ -513,10 +521,13 @@ async def translate_and_relay_brokerd_events(
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
relay.positions.setdefault(pos_msg['symbol'], {}).setdefault(
pos_msg['account'], {}
).update(pos_msg)
# relay through position msgs immediately by
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients:
await client_stream.send(pos_msg)
@ -621,8 +632,11 @@ async def translate_and_relay_brokerd_events(
# another stupid ib error to handle
# if 10147 in message: cancel
resp = 'broker_errored'
broker_details = msg.dict()
# don't relay message to order requester client
continue
# continue
elif name in (
'status',
@ -741,6 +755,7 @@ async def process_client_order_cmds(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=live_entry.account,
)
# NOTE: cancel response will be relayed back in messages
@ -814,6 +829,7 @@ async def process_client_order_cmds(
action=action,
price=trigger_price,
size=size,
account=msg.account,
)
# send request to backend
@ -994,7 +1010,10 @@ async def _emsd_main(
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(relay.positions)
await ems_ctx.started(
{sym: list(pps.values())
for sym, pps in relay.positions.items()}
)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
@ -1016,6 +1035,7 @@ async def _emsd_main(
try:
_router.clients.add(ems_client_order_stream)
# main entrypoint, run here until cancelled.
await process_client_order_cmds(
ems_client_order_stream,
@ -1035,7 +1055,7 @@ async def _emsd_main(
dialogues = _router.dialogues
for oid, client_stream in dialogues.items():
for oid, client_stream in dialogues.copy().items():
if client_stream == ems_client_order_stream:

View File

@ -45,6 +45,7 @@ class Order(BaseModel):
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: Union[str, Symbol]
account: str # should we set a default as '' ?
price: float
size: float
@ -86,6 +87,7 @@ class Status(BaseModel):
# 'broker_cancelled',
# 'broker_executed',
# 'broker_filled',
# 'broker_errored',
# 'alert_submitted',
# 'alert_triggered',
@ -118,6 +120,7 @@ class BrokerdCancel(BaseModel):
oid: str # piker emsd order id
time_ns: int
account: str
# "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows
@ -131,6 +134,7 @@ class BrokerdOrder(BaseModel):
action: str # {buy, sell}
oid: str
account: str
time_ns: int
# "broker request id": broker specific/internal order id if this is
@ -162,6 +166,7 @@ class BrokerdOrderAck(BaseModel):
# emsd id originally sent in matching request msg
oid: str
account: str = ''
class BrokerdStatus(BaseModel):
@ -170,6 +175,9 @@ class BrokerdStatus(BaseModel):
reqid: Union[int, str]
time_ns: int
# XXX: should be best effort set for every update
account: str = ''
# {
# 'submitted',
# 'cancelled',
@ -224,7 +232,11 @@ class BrokerdError(BaseModel):
This is still a TODO thing since we're not sure how to employ it yet.
'''
name: str = 'error'
reqid: Union[int, str]
oid: str
# if no brokerd order request was actually submitted (eg. we errored
# at the ``pikerd`` layer) then there will be ``reqid`` allocated.
reqid: Union[int, str] = ''
symbol: str
reason: str

View File

@ -35,7 +35,7 @@ from ..data._normalize import iterticks
from ..log import get_logger
from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdPosition,
BrokerdFill, BrokerdPosition, BrokerdError
)
@ -385,6 +385,19 @@ async def handle_order_requests(
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'paper':
log.error(
'This is a paper account, only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?',
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)

View File

@ -8,8 +8,9 @@ import trio
import tractor
from ..log import get_console_log, get_logger, colorize_json
from ..brokers import get_brokermod, config
from ..brokers import get_brokermod
from .._daemon import _tractor_kwargs
from .. import config
log = get_logger('cli')

View File

@ -22,10 +22,11 @@ from os.path import dirname
import shutil
from typing import Optional
from bidict import bidict
import toml
import click
from ..log import get_logger
from .log import get_logger
log = get_logger('broker-config')
@ -104,19 +105,29 @@ def write(
return toml.dump(config, cf)
def load_accounts() -> dict[str, Optional[str]]:
def load_accounts(
# our default paper engine entry
accounts: dict[str, Optional[str]] = {'paper': None}
providers: Optional[list[str]] = None
) -> bidict[str, Optional[str]]:
conf, path = load()
section = conf.get('accounts')
if section is None:
log.warning('No accounts config found?')
accounts = bidict()
for provider_name, section in conf.items():
accounts_section = section.get('accounts')
if (
providers is None or
providers and provider_name in providers
):
if accounts_section is None:
log.warning(f'No accounts named for {provider_name}?')
continue
else:
for brokername, account_labels in section.items():
for name, value in account_labels.items():
accounts[f'{brokername}.{name}'] = value
for label, value in accounts_section.items():
accounts[
f'{provider_name}.{label}'
] = value
# our default paper engine entry
accounts['paper'] = None
return accounts

View File

@ -106,6 +106,7 @@ class Symbol(BaseModel):
mult = 1 / self.tick_size
return round(value * mult) / mult
@validate_arguments
def mk_symbol(

View File

@ -23,7 +23,7 @@ from typing import Tuple, Dict, Any, Optional
from types import ModuleType
from functools import partial
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import Qt
from PyQt5.QtCore import QEvent
from PyQt5.QtWidgets import (
@ -277,7 +277,7 @@ class ChartnPane(QFrame):
'''
sidepane: FieldsForm
hbox: QtGui.QHBoxLayout
hbox: QtWidgets.QHBoxLayout
chart: Optional['ChartPlotWidget'] = None
def __init__(
@ -293,7 +293,7 @@ class ChartnPane(QFrame):
self.sidepane = sidepane
self.chart = None
hbox = self.hbox = QtGui.QHBoxLayout(self)
hbox = self.hbox = QtWidgets.QHBoxLayout(self)
hbox.setAlignment(Qt.AlignTop | Qt.AlignLeft)
hbox.setContentsMargins(0, 0, 0, 0)
hbox.setSpacing(3)

View File

@ -47,7 +47,7 @@ from PyQt5.QtWidgets import (
from ._event import open_handlers
from ._style import hcolor, _font, _font_small, DpiAwareFont
from ._label import FormatLabel
from .. import brokers
from .. import config
class FontAndChartAwareLineEdit(QLineEdit):
@ -382,21 +382,21 @@ def mk_form(
form._font_size = font_size or _font_small.px_size
# generate sub-components from schema dict
for key, config in fields_schema.items():
wtype = config['type']
label = str(config.get('label', key))
for key, conf in fields_schema.items():
wtype = conf['type']
label = str(conf.get('label', key))
# plain (line) edit field
if wtype == 'edit':
w = form.add_edit_field(
key,
label,
config['default_value']
conf['default_value']
)
# drop-down selection
elif wtype == 'select':
values = list(config['default_value'])
values = list(conf['default_value'])
w = form.add_select_field(
key,
label,
@ -417,8 +417,6 @@ async def open_form_input_handling(
) -> FieldsForm:
# assert form.model, f'{form} must define a `.model`'
async with open_handlers(
list(form.fields.values()),
@ -635,7 +633,7 @@ def mk_order_pane_layout(
# font_size: int = _font_small.px_size - 2
font_size: int = _font.px_size - 2
accounts = brokers.config.load_accounts()
accounts = config.load_accounts()
# TODO: maybe just allocate the whole fields form here
# and expect an async ctx entry?

View File

@ -198,7 +198,7 @@ async def handle_viewmode_kb_inputs(
Qt.Key_P,
}
):
pp_pane = order_mode.pp.pane
pp_pane = order_mode.current_pp.pane
if pp_pane.isHidden():
pp_pane.show()
else:
@ -213,7 +213,7 @@ async def handle_viewmode_kb_inputs(
if order_keys_pressed:
# show the pp size label
order_mode.pp.show()
order_mode.current_pp.show()
# TODO: show pp config mini-params in status bar widget
# mode.pp_config.show()
@ -259,20 +259,23 @@ async def handle_viewmode_kb_inputs(
) and
key in NUMBER_LINE
):
# hot key to set order slots size
# hot key to set order slots size.
# change edit field to current number line value,
# update the pp allocator bar, unhighlight the
# field when ctrl is released.
num = int(text)
pp_pane = order_mode.pane
pp_pane.on_ui_settings_change('slots', num)
edit = pp_pane.form.fields['slots']
edit.selectAll()
# un-highlight on ctrl release
on_next_release = edit.deselect
pp_pane.update_status_ui()
else: # none active
# hide pp label
order_mode.pp.hide_info()
order_mode.current_pp.hide_info()
# if none are pressed, remove "staged" level
# line under cursor position

View File

@ -224,6 +224,7 @@ class Label:
def show(self) -> None:
self.txt.show()
self.txt.update()
def hide(self) -> None:
self.txt.hide()

View File

@ -665,7 +665,7 @@ def order_line(
# display the order pos size, which is some multiple
# of the user defined base unit size
fmt_str=(
'{size:.{size_digits}f}u{fiat_text}'
'{account_text}{size:.{size_digits}f}u{fiat_text}'
),
color=line.color,
)
@ -679,13 +679,23 @@ def order_line(
if not fiat_size:
return ''
return f' -> ${humanize(fiat_size)}'
return f' ~ ${humanize(fiat_size)}'
def maybe_show_account_name(fields: dict) -> str:
account = fields.get('account')
if not account:
return ''
return f'{account}: '
label.fields = {
'size': size,
'size_digits': 0,
'fiat_size': None,
'fiat_text': maybe_show_fiat_text,
'account': None,
'account_text': maybe_show_account_name,
}
label.orient_v = orient_v

View File

@ -20,234 +20,93 @@ Position info and display
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from functools import partial
from math import floor
from math import floor, copysign
from typing import Optional
from bidict import bidict
from pyqtgraph import functions as fn
from pydantic import BaseModel, validator
from ._annotate import LevelMarker
from ._anchors import (
pp_tight_and_right, # wanna keep it straight in the long run
gpath_pin,
)
from ..calc import humanize
from ..clearing._messages import BrokerdPosition, Status
from ..data._source import Symbol
from ..calc import humanize, pnl
from ..clearing._allocate import Allocator, Position
from ..data._normalize import iterticks
from ..data.feed import Feed
from ._label import Label
from ._lines import LevelLine, order_line
from ._style import _font
from ._forms import FieldsForm, FillStatusBar, QLabel
from ..log import get_logger
from ..clearing._messages import Order
log = get_logger(__name__)
_pnl_tasks: dict[str, bool] = {}
class Position(BaseModel):
'''Basic pp (personal position) model with attached fills history.
async def display_pnl(
This type should be IPC wire ready?
feed: Feed,
order_mode: OrderMode, # noqa
) -> None:
'''Real-time display the current pp's PnL in the appropriate label.
``ValueError`` if this task is spawned where there is a net-zero pp.
'''
symbol: Symbol
global _pnl_tasks
# last size and avg entry price
size: float
avg_price: float # TODO: contextual pricing
pp = order_mode.current_pp
live = pp.live_pp
key = live.symbol.key
# ordered record of known constituent trade messages
fills: list[Status] = []
if live.size < 0:
types = ('ask', 'last', 'last', 'utrade')
elif live.size > 0:
types = ('bid', 'last', 'last', 'utrade')
_size_units = bidict({
'currency': '$ size',
'units': '# units',
# TODO: but we'll need a `<brokermod>.get_accounts()` or something
# 'percent_of_port': '% of port',
})
SizeUnit = Enum(
'SizeUnit',
_size_units,
else:
raise RuntimeError('No pp?!?!')
# real-time update pnl on the status pane
try:
async with feed.stream.subscribe() as bstream:
# last_tick = time.time()
async for quotes in bstream:
# now = time.time()
# period = now - last_tick
for sym, quote in quotes.items():
for tick in iterticks(quote, types):
# print(f'{1/period} Hz')
size = order_mode.current_pp.live_pp.size
if size == 0:
# terminate this update task since we're
# no longer in a pp
order_mode.pane.pnl_label.format(pnl=0)
return
else:
# compute and display pnl status
order_mode.pane.pnl_label.format(
pnl=copysign(1, size) * pnl(
# live.avg_price,
order_mode.current_pp.live_pp.avg_price,
tick['price'],
),
)
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
# underscore_attrs_are_private = False
symbol: Symbol
account: Optional[str] = 'paper'
_accounts: bidict[str, Optional[str]]
@validator('account', pre=True)
def set_account(cls, v, values):
if v:
return values['_accounts'][v]
size_unit: SizeUnit = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit')
def lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
return v.name
# TODO: if we ever want ot support non-uniform entry-slot-proportion
# "sizes"
# disti_weight: str = 'uniform'
units_limit: float
currency_limit: float
slots: int
def step_sizes(
self,
) -> (float, float):
'''Return the units size for each unit type as a tuple.
'''
slots = self.slots
return (
self.units_limit / slots,
self.currency_limit / slots,
)
def limit(self) -> float:
if self.size_unit == 'currency':
return self.currency_limit
else:
return self.units_limit
def next_order_info(
self,
startup_pp: Position,
live_pp: Position,
price: float,
action: str,
) -> dict:
'''Generate order request info for the "next" submittable order
depending on position / order entry config.
'''
sym = self.symbol
ld = sym.lot_size_digits
size_unit = self.size_unit
live_size = live_pp.size
abs_live_size = abs(live_size)
abs_startup_size = abs(startup_pp.size)
u_per_slot, currency_per_slot = self.step_sizes()
if size_unit == 'units':
slot_size = u_per_slot
l_sub_pp = self.units_limit - abs_live_size
elif size_unit == 'currency':
live_cost_basis = abs_live_size * live_pp.avg_price
slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price
# an entry (adding-to or starting a pp)
if (
action == 'buy' and live_size > 0 or
action == 'sell' and live_size < 0 or
live_size == 0
):
order_size = min(slot_size, l_sub_pp)
# an exit (removing-from or going to net-zero pp)
else:
# when exiting a pp we always try to slot the position
# in the instrument's units, since doing so in a derived
# size measure (eg. currency value, percent of port) would
# result in a mis-mapping of slots sizes in unit terms
# (i.e. it would take *more* slots to exit at a profit and
# *less* slots to exit at a loss).
pp_size = max(abs_startup_size, abs_live_size)
slotted_pp = pp_size / self.slots
if size_unit == 'currency':
# compute the "projected" limit's worth of units at the
# current pp (weighted) price:
slot_size = currency_per_slot / live_pp.avg_price
else:
slot_size = u_per_slot
# if our position is greater then our limit setting
# we'll want to use slot sizes which are larger then what
# the limit would normally determine
order_size = max(slotted_pp, slot_size)
if (
abs_live_size < slot_size or
# NOTE: front/back "loading" heurstic:
# if the remaining pp is in between 0-1.5x a slot's
# worth, dump the whole position in this last exit
# therefore conducting so called "back loading" but
# **without** going past a net-zero pp. if the pp is
# > 1.5x a slot size, then front load: exit a slot's and
# expect net-zero to be acquired on the final exit.
slot_size < pp_size < round((1.5*slot_size), ndigits=ld)
):
order_size = abs_live_size
slots_used = 1.0 # the default uniform policy
if order_size < slot_size:
# compute a fractional slots size to display
slots_used = self.slots_used(
Position(symbol=sym, size=order_size, avg_price=price)
)
return {
'size': abs(round(order_size, ndigits=ld)),
'size_digits': ld,
# TODO: incorporate multipliers for relevant derivatives
'fiat_size': round(order_size * price, ndigits=2),
'slots_used': slots_used,
}
def slots_used(
self,
pp: Position,
) -> float:
'''Calc and return the number of slots used by this ``Position``.
'''
abs_pp_size = abs(pp.size)
if self.size_unit == 'currency':
# live_currency_size = size or (abs_pp_size * pp.avg_price)
live_currency_size = abs_pp_size * pp.avg_price
prop = live_currency_size / self.currency_limit
else:
# return (size or abs_pp_size) / alloc.units_limit
prop = abs_pp_size / self.units_limit
# TODO: REALLY need a way to show partial slots..
# for now we round at the midway point between slots
return round(prop * self.slots)
# last_tick = time.time()
finally:
assert _pnl_tasks[key]
assert _pnl_tasks.pop(key)
@dataclass
@ -256,10 +115,6 @@ class SettingsPane:
order entry sizes and position limits per tradable instrument.
'''
# config for and underlying validation model
tracker: PositionTracker
alloc: Allocator
# input fields
form: FieldsForm
@ -270,9 +125,8 @@ class SettingsPane:
pnl_label: QLabel
limit_label: QLabel
def transform_to(self, size_unit: str) -> None:
if self.alloc.size_unit == size_unit:
return
# encompasing high level namespace
order_mode: Optional['OrderMode'] = None # typing: ignore # noqa
def on_selection_change(
self,
@ -284,8 +138,7 @@ class SettingsPane:
'''Called on any order pane drop down selection change.
'''
print(f'selection input: {text}')
setattr(self.alloc, key, text)
log.info(f'selection input: {text}')
self.on_ui_settings_change(key, text)
def on_ui_settings_change(
@ -298,11 +151,49 @@ class SettingsPane:
'''Called on any order pane edit field value change.
'''
print(f'settings change: {key}: {value}')
alloc = self.alloc
mode = self.order_mode
# an account switch request
if key == 'account':
# hide details on the old selection
old_tracker = mode.current_pp
old_tracker.hide_info()
# re-assign the order mode tracker
account_name = value
tracker = mode.trackers.get(account_name)
# if selection can't be found (likely never discovered with
# a ``brokerd`) then error and switch back to the last
# selection.
if tracker is None:
sym = old_tracker.chart.linked.symbol.key
log.error(
f'Account `{account_name}` can not be set for {sym}'
)
self.form.fields['account'].setCurrentText(
old_tracker.alloc.account_name())
return
self.order_mode.current_pp = tracker
assert tracker.alloc.account_name() == account_name
self.form.fields['account'].setCurrentText(account_name)
tracker.show()
tracker.hide_info()
self.display_pnl(tracker)
# load the new account's allocator
alloc = tracker.alloc
else:
tracker = mode.current_pp
alloc = tracker.alloc
size_unit = alloc.size_unit
# write any passed settings to allocator
# WRITE any settings to current pp's allocator
if key == 'limit':
if size_unit == 'currency':
alloc.currency_limit = float(value)
@ -317,20 +208,18 @@ class SettingsPane:
# the current settings in the new units
pass
elif key == 'account':
print(f'TODO: change account -> {value}')
else:
elif key != 'account':
raise ValueError(f'Unknown setting {key}')
# read out settings and update UI
# READ out settings and update UI
log.info(f'settings change: {key}: {value}')
suffix = {'currency': ' $', 'units': ' u'}[size_unit]
limit = alloc.limit()
# TODO: a reverse look up from the position to the equivalent
# account(s), if none then look to user config for default?
self.update_status_ui()
self.update_status_ui(pp=tracker)
step_size, currency_per_slot = alloc.step_sizes()
@ -356,68 +245,16 @@ class SettingsPane:
# UI in some way?
return True
def init_status_ui(
self,
):
alloc = self.alloc
asset_type = alloc.symbol.type_key
# form = self.form
# TODO: pull from piker.toml
# default config
slots = 4
currency_limit = 5e3
startup_pp = self.tracker.startup_pp
alloc.slots = slots
alloc.currency_limit = currency_limit
# default entry sizing
if asset_type in ('stock', 'crypto', 'forex'):
alloc.size_unit = '$ size'
elif asset_type in ('future', 'option', 'futures_option'):
# since it's harder to know how currency "applies" in this case
# given leverage properties
alloc.size_unit = '# units'
# set units limit to slots size thus making make the next
# entry step 1.0
alloc.units_limit = slots
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':
startup_size = startup_pp.size * startup_pp.avg_price
if startup_size > alloc.currency_limit:
alloc.currency_limit = round(startup_size, ndigits=2)
limit_text = alloc.currency_limit
else:
startup_size = startup_pp.size
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
limit_text = alloc.units_limit
self.on_ui_settings_change('limit', limit_text)
self.update_status_ui(size=startup_size)
def update_status_ui(
self,
size: float = None,
pp: PositionTracker,
) -> None:
alloc = self.alloc
alloc = pp.alloc
slots = alloc.slots
used = alloc.slots_used(self.tracker.live_pp)
used = alloc.slots_used(pp.live_pp)
# calculate proportion of position size limit
# that exists and display in fill bar
@ -430,31 +267,51 @@ class SettingsPane:
min(used, slots)
)
def on_level_change_update_next_order_info(
def display_pnl(
self,
tracker: PositionTracker,
level: float,
line: LevelLine,
order: Order,
) -> bool:
'''Display the PnL for the current symbol and personal positioning (pp).
) -> None:
'''A callback applied for each level change to the line
which will recompute the order size based on allocator
settings. this is assigned inside
``OrderMode.line_from_order()``
If a position is open start a background task which will
real-time update the pnl label in the settings pane.
'''
order_info = self.alloc.next_order_info(
startup_pp=self.tracker.startup_pp,
live_pp=self.tracker.live_pp,
price=level,
action=order.action,
)
line.update_labels(order_info)
mode = self.order_mode
sym = mode.chart.linked.symbol
size = tracker.live_pp.size
feed = mode.quote_feed
global _pnl_tasks
# update bound-in staged order
order.price = level
order.size = order_info['size']
if (
size and
sym.key not in _pnl_tasks
):
_pnl_tasks[sym.key] = True
# immediately compute and display pnl status from last quote
self.pnl_label.format(
pnl=copysign(1, size) * pnl(
tracker.live_pp.avg_price,
# last historical close price
feed.shm.array[-1][['close']][0],
),
)
log.info(
f'Starting pnl display for {tracker.alloc.account_name()}')
self.order_mode.nursery.start_soon(
display_pnl,
feed,
mode,
)
return True
else:
# set 0% pnl
self.pnl_label.format(pnl=0)
return False
def position_line(
@ -522,8 +379,8 @@ def position_line(
class PositionTracker:
'''Track and display a real-time position for a single symbol
on a chart.
'''Track and display real-time positions for a single symbol
over multiple accounts on a single chart.
Graphically composed of a level line and marker as well as labels
for indcating current position information. Updates are made to the
@ -532,11 +389,12 @@ class PositionTracker:
'''
# inputs
chart: 'ChartPlotWidget' # noqa
alloc: Allocator
# allocated
alloc: Allocator
startup_pp: Position
live_pp: Position
# allocated
pp_label: Label
size_label: Label
line: Optional[LevelLine] = None
@ -547,17 +405,15 @@ class PositionTracker:
self,
chart: 'ChartPlotWidget', # noqa
alloc: Allocator,
startup_pp: Position,
) -> None:
self.chart = chart
self.alloc = alloc
self.live_pp = Position(
symbol=chart.linked.symbol,
size=0,
avg_price=0,
)
self.startup_pp = self.live_pp.copy()
self.startup_pp = startup_pp
self.live_pp = startup_pp.copy()
view = chart.getViewBox()
@ -622,9 +478,8 @@ class PositionTracker:
self.pp_label.update()
self.size_label.update()
def update_from_pp_msg(
def update_from_pp(
self,
msg: BrokerdPosition,
position: Optional[Position] = None,
) -> None:
@ -632,23 +487,13 @@ class PositionTracker:
EMS ``BrokerdPosition`` msg.
'''
# XXX: better place to do this?
symbol = self.chart.linked.symbol
lot_size_digits = symbol.lot_size_digits
avg_price, size = (
round(msg['avg_price'], ndigits=symbol.tick_size_digits),
round(msg['size'], ndigits=lot_size_digits),
)
# live pp updates
pp = position or self.live_pp
pp.avg_price = avg_price
pp.size = size
self.update_line(
avg_price,
size,
lot_size_digits,
pp.avg_price,
pp.size,
self.chart.linked.symbol.lot_size_digits,
)
# label updates
@ -656,11 +501,11 @@ class PositionTracker:
self.alloc.slots_used(pp), ndigits=1)
self.size_label.render()
if size == 0:
if pp.size == 0:
self.hide()
else:
self._level_marker.level = avg_price
self._level_marker.level = pp.avg_price
# these updates are critical to avoid lag on view/scene changes
self._level_marker.update() # trigger paint
@ -681,7 +526,6 @@ class PositionTracker:
def show(self) -> None:
if self.live_pp.size:
self.line.show()
self.line.show_labels()
@ -740,7 +584,6 @@ class PositionTracker:
return arrow
# TODO: per account lines on a single (or very related) symbol
def update_line(
self,
price: float,
@ -776,7 +619,10 @@ class PositionTracker:
line.update_labels({
'size': size,
'size_digits': size_digits,
'fiat_size': round(price * size, ndigits=2)
'fiat_size': round(price * size, ndigits=2),
# TODO: per account lines on a single (or very related) symbol
'account': self.alloc.account_name(),
})
line.show()

View File

@ -21,27 +21,30 @@ Chart trading, the only way to scalp.
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from functools import partial
from math import copysign
from pprint import pformat
import time
from typing import Optional, Dict, Callable, Any
import uuid
from bidict import bidict
from pydantic import BaseModel
import tractor
import trio
from .. import brokers
from ..calc import pnl
from .. import config
from ..clearing._client import open_ems, OrderBook
from ..clearing._allocate import (
mk_allocator,
Position,
)
from ..data._source import Symbol
from ..data._normalize import iterticks
from ..data.feed import Feed
from ..log import get_logger
from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine
from ._position import PositionTracker, SettingsPane, Allocator, _size_units
from ._position import (
PositionTracker,
SettingsPane,
)
from ._window import MultiStatus
from ..clearing._messages import Order
from ._forms import open_form_input_handling
@ -69,6 +72,37 @@ class OrderDialog(BaseModel):
underscore_attrs_are_private = False
def on_level_change_update_next_order_info(
level: float,
# these are all ``partial``-ed in at callback assignment time.
line: LevelLine,
order: Order,
tracker: PositionTracker,
) -> None:
'''A callback applied for each level change to the line
which will recompute the order size based on allocator
settings. this is assigned inside
``OrderMode.line_from_order()``
'''
# NOTE: the ``Order.account`` is set at order stage time
# inside ``OrderMode.line_from_order()``.
order_info = tracker.alloc.next_order_info(
startup_pp=tracker.startup_pp,
live_pp=tracker.live_pp,
price=level,
action=order.action,
)
line.update_labels(order_info)
# update bound-in staged order
order.price = level
order.size = order_info['size']
@dataclass
class OrderMode:
'''Major UX mode for placing orders on a chart view providing so
@ -90,16 +124,18 @@ class OrderMode:
'''
chart: 'ChartPlotWidget' # type: ignore # noqa
nursery: trio.Nursery
quote_feed: Feed
book: OrderBook
lines: LineEditor
arrows: ArrowEditor
multistatus: MultiStatus
pp: PositionTracker
allocator: 'Allocator' # noqa
pane: SettingsPane
trackers: dict[str, PositionTracker]
# switched state, the current position
current_pp: Optional[PositionTracker] = None
active: bool = False
name: str = 'order'
dialogs: dict[str, OrderDialog] = field(default_factory=dict)
@ -144,9 +180,10 @@ class OrderMode:
# immediately
if order.action != 'alert':
line._on_level_change = partial(
self.pane.on_level_change_update_next_order_info,
on_level_change_update_next_order_info,
line=line,
order=order,
tracker=self.current_pp,
)
else:
@ -185,6 +222,7 @@ class OrderMode:
order = self._staged_order = Order(
action=action,
price=price,
account=self.current_pp.alloc.account_name(),
size=0,
symbol=symbol,
brokers=symbol.brokers,
@ -490,7 +528,7 @@ async def open_order_mode(
book: OrderBook
trades_stream: tractor.MsgStream
positions: dict
position_msgs: dict
# spawn EMS actor-service
async with (
@ -498,9 +536,9 @@ async def open_order_mode(
open_ems(brokername, symbol) as (
book,
trades_stream,
positions
position_msgs
),
trio.open_nursery() as n,
trio.open_nursery() as tn,
):
log.info(f'Opening order mode for {brokername}.{symbol.key}')
@ -511,37 +549,135 @@ async def open_order_mode(
lines = LineEditor(chart=chart)
arrows = ArrowEditor(chart, {})
# allocation and account settings side pane
form = chart.sidepane
# symbol id
symbol = chart.linked.symbol
symkey = symbol.key
# map of per-provider account keys to position tracker instances
trackers: dict[str, PositionTracker] = {}
# load account names from ``brokers.toml``
accounts = bidict(brokers.config.load_accounts())
accounts = config.load_accounts(providers=symbol.brokers).copy()
if accounts:
# first account listed is the one we select at startup
# (aka order based selection).
pp_account = next(iter(accounts.keys()))
else:
pp_account = 'paper'
# NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg.
pp_msgs = position_msgs.get(symkey, ())
# update all pp trackers with existing data relayed
# from ``brokerd``.
for msg in pp_msgs:
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
account_value = msg.get('account')
account_name = accounts.inverse.get(account_value)
if not account_name and account_value == 'paper':
account_name = 'paper'
# net-zero pp
startup_pp = Position(
symbol=symbol,
size=0,
avg_price=0,
)
startup_pp.update_from_msg(msg)
# allocator
alloc = Allocator(
alloc = mk_allocator(
symbol=symbol,
account=None, # select paper by default
_accounts=accounts,
size_unit=_size_units['currency'],
units_limit=400,
currency_limit=5e3,
slots=4,
accounts=accounts,
account=account_name,
# if this startup size is greater the allocator limit,
# the limit is increased internally in this factory.
startup_pp=startup_pp,
)
form = chart.sidepane
form.model = alloc
pp_tracker = PositionTracker(chart, alloc)
pp_tracker = PositionTracker(
chart,
alloc,
startup_pp
)
pp_tracker.hide()
trackers[account_name] = pp_tracker
assert pp_tracker.startup_pp.size == pp_tracker.live_pp.size
# TODO: do we even really need the "startup pp" or can we
# just take the max and pass that into the some state / the
# alloc?
pp_tracker.update_from_pp()
if pp_tracker.startup_pp.size != 0:
# if no position, don't show pp tracking graphics
pp_tracker.show()
pp_tracker.hide_info()
# fill out trackers for accounts with net-zero pps
zero_pp_accounts = set(accounts) - set(trackers)
for account_name in zero_pp_accounts:
startup_pp = Position(
symbol=symbol,
size=0,
avg_price=0,
)
# allocator
alloc = mk_allocator(
symbol=symbol,
accounts=accounts,
account=account_name,
startup_pp=startup_pp,
)
pp_tracker = PositionTracker(
chart,
alloc,
startup_pp
)
pp_tracker.hide()
trackers[account_name] = pp_tracker
# order pane widgets and allocation model
order_pane = SettingsPane(
tracker=pp_tracker,
form=form,
alloc=alloc,
# XXX: ugh, so hideous...
fill_bar=form.fill_bar,
pnl_label=form.left_label,
step_label=form.bottom_label,
limit_label=form.top_label,
)
# top level abstraction which wraps all this crazyness into
# a namespace..
mode = OrderMode(
chart,
tn,
feed,
book,
lines,
arrows,
multistatus,
pane=order_pane,
trackers=trackers,
)
# XXX: MUST be set
order_pane.order_mode = mode
# select a pp to track
tracker = trackers[pp_account]
mode.current_pp = tracker
tracker.show()
tracker.hide_info()
# XXX: would love to not have to do this separate from edit
# fields (which are done in an async loop - see below)
# connect selection signals (from drop down widgets)
@ -556,77 +692,17 @@ async def open_order_mode(
)
)
# top level abstraction which wraps all this crazyness into
# a namespace..
mode = OrderMode(
chart,
book,
lines,
arrows,
multistatus,
pp_tracker,
allocator=alloc,
pane=order_pane,
)
# make fill bar and positioning snapshot
order_pane.on_ui_settings_change('limit', tracker.alloc.limit())
order_pane.update_status_ui(pp=tracker)
# TODO: create a mode "manager" of sorts?
# -> probably just call it "UxModes" err sumthin?
# so that view handlers can access it
view.order_mode = mode
our_sym = mode.chart.linked._symbol.key
# update any exising position
pp_msg = None
for sym, msg in positions.items():
if sym.lower() in our_sym:
pp_msg = msg
break
# make fill bar and positioning snapshot
# XXX: this need to be called *before* the first
# pp tracker update(s) below to ensure the limit size unit has
# been correctly set prior to updating the line's pp size label
# (the one on the RHS).
# TODO: should probably split out the alloc config from the UI
# config startup steps..
order_pane.init_status_ui()
# we should probably make the allocator config
# and explitict helper func call that takes in the aloc and
# the postion / symbol info then take that alloc ref and
# update the pp_tracker and pp_pane?
if pp_msg:
pp_tracker.update_from_pp_msg(msg)
order_pane.update_status_ui()
live_pp = mode.pp.live_pp
size = live_pp.size
if size:
global _zero_pp
_zero_pp = False
# compute and display pnl status immediately
mode.pane.pnl_label.format(
pnl=copysign(1, size) * pnl(
live_pp.avg_price,
# last historical close price
feed.shm.array[-1][['close']][0],
),
)
# spawn updater task
n.start_soon(
display_pnl,
feed,
mode,
)
else:
# set 0% pnl
mode.pane.pnl_label.format(pnl=0)
order_pane.on_ui_settings_change('account', pp_account)
mode.pane.display_pnl(mode.current_pp)
# Begin order-response streaming
done()
@ -645,14 +721,13 @@ async def open_order_mode(
),
):
# signal to top level symbol loading task we're ready
# to handle input since the ems connection is ready
started.set()
n.start_soon(
tn.start_soon(
process_trades_and_update_ui,
n,
tn,
feed,
mode,
trades_stream,
@ -661,67 +736,6 @@ async def open_order_mode(
yield mode
_zero_pp: bool = True
async def display_pnl(
feed: Feed,
order_mode: OrderMode,
) -> None:
'''Real-time display the current pp's PnL in the appropriate label.
Error if this task is spawned where there is a net-zero pp.
'''
global _zero_pp
assert not _zero_pp
pp = order_mode.pp
live = pp.live_pp
if live.size < 0:
types = ('ask', 'last', 'last', 'utrade')
elif live.size > 0:
types = ('bid', 'last', 'last', 'utrade')
else:
raise RuntimeError('No pp?!?!')
# real-time update pnl on the status pane
async with feed.stream.subscribe() as bstream:
# last_tick = time.time()
async for quotes in bstream:
# now = time.time()
# period = now - last_tick
for sym, quote in quotes.items():
for tick in iterticks(quote, types):
# print(f'{1/period} Hz')
size = live.size
if size == 0:
# terminate this update task since we're
# no longer in a pp
_zero_pp = True
order_mode.pane.pnl_label.format(pnl=0)
return
else:
# compute and display pnl status
order_mode.pane.pnl_label.format(
pnl=copysign(1, size) * pnl(
live.avg_price,
tick['price'],
),
)
# last_tick = time.time()
async def process_trades_and_update_ui(
n: trio.Nursery,
@ -733,8 +747,7 @@ async def process_trades_and_update_ui(
) -> None:
get_index = mode.chart.get_index
tracker = mode.pp
global _zero_pp
global _pnl_tasks
# this is where we receive **back** messages
# about executions **from** the EMS actor
@ -747,24 +760,19 @@ async def process_trades_and_update_ui(
if name in (
'position',
):
# show line label once order is live
sym = mode.chart.linked.symbol
if msg['symbol'].lower() in sym.key:
tracker.update_from_pp_msg(msg)
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
tracker.update_from_pp()
# update order pane widgets
mode.pane.update_status_ui()
mode.pane.update_status_ui(tracker)
if mode.pp.live_pp.size and _zero_pp:
_zero_pp = False
n.start_soon(
display_pnl,
feed,
mode,
)
mode.pane.display_pnl(tracker)
# short circuit to next msg to avoid
# uncessary msg content lookups
# unnecessary msg content lookups
continue
resp = msg['resp']
@ -795,10 +803,13 @@ async def process_trades_and_update_ui(
elif resp in (
'broker_cancelled',
'broker_inactive',
'broker_errored',
'dark_cancelled'
):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}')
elif resp in (
'dark_triggered'
@ -849,4 +860,6 @@ async def process_trades_and_update_ui(
arrow_index=get_index(details['broker_time']),
)
tracker.live_pp.fills.append(msg)
# TODO: how should we look this up?
# tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg)

View File

@ -3,8 +3,8 @@ import os
import pytest
import tractor
import trio
from piker import log
from piker.brokers import questrade, config
from piker import log, config
from piker.brokers import questrade
def pytest_addoption(parser):