Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 70921fcb8c Add some type annots around pp msg handling 2021-10-29 16:14:45 -04:00
Tyler Goodlet 5ea2273cfb Factor out context cacher to `tractor.trionics` 2021-10-29 16:14:45 -04:00
Tyler Goodlet e4ddc794ad Error out clearing task on first quote being nan 2021-10-29 16:14:45 -04:00
Tyler Goodlet 6ed455d23d Drop throttled rate margin to 100us 2021-10-29 16:14:45 -04:00
Tyler Goodlet 1cb3fedb81 Turn on profiling for the moment 2021-10-29 16:14:45 -04:00
Tyler Goodlet a461139a85 De-densify some funcs 2021-10-29 16:14:45 -04:00
Tyler Goodlet dfc407eb39 Add some typing around web bs 2021-10-29 16:14:45 -04:00
Tyler Goodlet 67a5ff54cb Rename feed bus entrypoint 2021-10-29 16:14:45 -04:00
Tyler Goodlet 9354d0d8e2 Update some typing and add latency checks for binance 2021-10-29 16:14:45 -04:00
Tyler Goodlet 95f4b2aa02 Expect accounts as tuple, don't start rt pnl on no live pp 2021-10-29 16:14:45 -04:00
12 changed files with 95 additions and 160 deletions

View File

@ -18,30 +18,18 @@
Cacheing apis and toolz.
"""
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import (
asynccontextmanager,
)
import trio
from trio_typing import TaskStatus
import tractor
from tractor.trionics import maybe_open_context
from .brokers import get_brokermod
from .log import get_logger
T = TypeVar('T')
log = get_logger(__name__)
@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
return decorator
_cache: dict[str, 'Client'] = {} # noqa
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))
@asynccontextmanager
async def maybe_open_ctx(
key: Hashable,
mngr: AsyncContextManager[T],
) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''
await cache.lock.acquire()
ctx_key = id(mngr)
value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value
except KeyError:
log.info(f'Allocating new resource for {key}')
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()
yield False, value
finally:
cache.users -= 1
if cache.lock.locked():
cache.lock.release()
if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')
# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()
@asynccontextmanager
async def open_cached_client(
brokername: str,
@ -190,7 +72,7 @@ async def open_cached_client(
'''
brokermod = get_brokermod(brokername)
async with maybe_open_ctx(
async with maybe_open_context(
key=brokername,
mngr=brokermod.get_client(),
) as (cache_hit, client):

View File

@ -21,7 +21,7 @@ Profiling wrappers for internal libs.
import time
from functools import wraps
_pg_profile: bool = False
_pg_profile: bool = True
def pg_profile_enabled() -> bool:

View File

@ -19,7 +19,7 @@ Binance backend
"""
from contextlib import asynccontextmanager
from typing import List, Dict, Any, Tuple, Union, Optional
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
import time
import trio
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__)
@ -213,7 +213,7 @@ class Client:
)
# repack in dict form
return {item[0]['symbol']: item[0]
for item in matches}
for item in matches}
async def bars(
self,
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
M: bool # Ignore
async def stream_messages(ws):
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0
while True:
@ -487,11 +487,20 @@ async def stream_quotes(
# signal to caller feed is ready for consumption
feed_is_live.set()
# import time
# last = time.time()
# start streaming
async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()
@tractor.context

View File

@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
@ -47,9 +48,11 @@ log = get_logger(__name__)
# TODO: numba all of this
def mk_check(
trigger_price: float,
known_last: float,
action: str,
) -> Callable[[float, float], bool]:
"""Create a predicate for given ``exec_price`` based on last known
price, ``known_last``.
@ -77,8 +80,7 @@ def mk_check(
return check_lt
else:
return None
raise ValueError('trigger: {trigger_price}, last: {known_last}')
@dataclass
@ -177,7 +179,15 @@ async def clear_dark_triggers(
tuple(execs.items())
):
if not pred or (ttype not in tf) or (not pred(price)):
if (
not pred or
ttype not in tf or
not pred(price)
):
log.debug(
f'skipping quote for {sym} '
f'{pred}, {ttype} not in {tf}?, {pred(price)}'
)
# majority of iterations will be non-matches
continue
@ -269,7 +279,7 @@ class TradesRelay:
positions: dict[str, dict[str, BrokerdPosition]]
# allowed account names
accounts: set[str]
accounts: tuple[str]
# count of connected ems clients for this ``brokerd``
consumers: int = 0
@ -414,6 +424,9 @@ async def open_brokerd_trades_dialogue(
)
try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
@ -449,7 +462,7 @@ async def open_brokerd_trades_dialogue(
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=pps,
accounts=set(accounts),
accounts=accounts,
consumers=1,
)
@ -1002,7 +1015,8 @@ async def _emsd_main(
first_quote = feed.first_quotes[symbol]
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote['last']
last = book.lasts[(broker, symbol)] = first_quote['last']
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
# open a stream with the brokerd backend for order
# flow dialogue

View File

@ -172,7 +172,6 @@ async def sample_and_broadcast(
# iterate stream delivered by broker
async for quotes in quote_stream:
# TODO: ``numba`` this!
for sym, quote in quotes.items():
@ -185,8 +184,12 @@ async def sample_and_broadcast(
# start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']:
# TODO: we should probably not write every single
# value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive..
ticks = quote['ticks']
for tick in ticks:
ticktype = tick['type']
# write trade events to shm last OHLC sample
@ -258,7 +261,8 @@ async def sample_and_broadcast(
except (
trio.BrokenResourceError,
trio.ClosedResourceError
trio.ClosedResourceError,
trio.EndOfChannel,
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
@ -268,6 +272,10 @@ async def sample_and_broadcast(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
if tick_throttle:
assert stream.closed()
# await stream.aclose()
subs.remove((stream, tick_throttle))
@ -283,12 +291,8 @@ async def uniform_rate_send(
) -> None:
sleep_period = 1/rate - 0.000616
sleep_period = 1/rate - 0.0001 # 100us
last_send = time.time()
aname = stream._ctx.chan.uid[0]
fsp = False
if 'fsp' in aname:
fsp = True
while True:
@ -308,20 +312,33 @@ async def uniform_rate_send(
sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
# XXX: idea for frame type data structure we could use on the
# wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
if ticks:
first_quote['ticks'].extend(ticks)
except trio.WouldBlock:
now = time.time()
rate = 1 / (now - last_send)
last_send = now
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
log.debug(
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
)
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
last_send = now
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop

View File

@ -395,6 +395,7 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy)

View File

@ -133,9 +133,11 @@ def mk_symbol(
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType
from typing import Any, Callable
from typing import Any, Callable, AsyncGenerator
import json
import trio
@ -127,7 +127,7 @@ async def open_autorecon_ws(
# TODO: proper type annot smh
fixture: Callable,
):
) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""

View File

@ -34,11 +34,10 @@ import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
# from tractor import _broadcast
from pydantic import BaseModel
from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx
from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
@ -247,7 +246,7 @@ async def allocate_persistent_feed(
@tractor.context
async def attach_feed_bus(
async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
@ -364,7 +363,7 @@ async def open_sample_step_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_ctx(
async with maybe_open_context(
key=delay_s,
mngr=portal.open_stream_from(
iter_ohlc_periods,
@ -507,7 +506,7 @@ async def open_feed(
portal.open_context(
attach_feed_bus,
open_feed_bus,
brokername=brokername,
symbol=sym,
loglevel=loglevel,
@ -586,7 +585,7 @@ async def maybe_open_feed(
'''
sym = symbols[0].lower()
async with maybe_open_ctx(
async with maybe_open_context(
key=(brokername, sym),
mngr=open_feed(
brokername,

View File

@ -35,7 +35,7 @@ import tractor
import trio
from .. import brokers
from .._cacheables import maybe_open_ctx
from .._cacheables import maybe_open_context
from ..trionics import async_enter_all
from ..data.feed import open_feed, Feed
from ._chart import (
@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster(
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
uid = tractor.current_actor().uid
async with maybe_open_ctx(
async with maybe_open_context(
key=uid, # for now make a cluster per client?
mngr=open_fsp_cluster(
workers,

View File

@ -54,6 +54,7 @@ async def update_pnl_from_feed(
feed: Feed,
order_mode: OrderMode, # noqa
tracker: PositionTracker,
) -> None:
'''Real-time display the current pp's PnL in the appropriate label.
@ -76,7 +77,8 @@ async def update_pnl_from_feed(
types = ('bid', 'last', 'last', 'utrade')
else:
raise RuntimeError('No pp?!?!')
log.info(f'No position (yet) for {tracker.alloc.account}@{key}')
return
# real-time update pnl on the status pane
try:
@ -343,6 +345,7 @@ class SettingsPane:
update_pnl_from_feed,
feed,
mode,
tracker,
)
# immediately display in status label

View File

@ -47,7 +47,7 @@ from ._position import (
)
from ._label import FormatLabel
from ._window import MultiStatus
from ..clearing._messages import Order
from ..clearing._messages import Order, BrokerdPosition
from ._forms import open_form_input_handling
@ -529,7 +529,12 @@ async def open_order_mode(
book: OrderBook
trades_stream: tractor.MsgStream
position_msgs: dict
# The keys in this dict **must** be in set our set of "normalized"
# symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the
# display of a position indicator on screen.
position_msgs: dict[str, list[BrokerdPosition]]
# spawn EMS actor-service
async with (
@ -563,7 +568,9 @@ async def open_order_mode(
providers=symbol.brokers
)
# use only loaded accounts according to brokerd
# XXX: ``brokerd`` delivers a set of account names that it allows
# use of but the user also can define the accounts they'd like
# to use, in order, in their `brokers.toml` file.
accounts = {}
for name in brokerd_accounts:
# ensure name is in ``brokers.toml``
@ -620,8 +627,8 @@ async def open_order_mode(
# alloc?
pp_tracker.update_from_pp()
# on existing position, show pp tracking graphics
if pp_tracker.startup_pp.size != 0:
# if no position, don't show pp tracking graphics
pp_tracker.show()
pp_tracker.hide_info()
@ -805,12 +812,13 @@ async def process_trades_and_update_ui(
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
tracker.update_from_pp()
# update order pane widgets
tracker.update_from_pp()
mode.pane.update_status_ui(tracker)
# display pnl
mode.pane.display_pnl(tracker)
if tracker.live_pp.size:
# display pnl
mode.pane.display_pnl(tracker)
# short circuit to next msg to avoid
# unnecessary msg content lookups