Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 70921fcb8c Add some type annots around pp msg handling 2021-10-29 16:14:45 -04:00
Tyler Goodlet 5ea2273cfb Factor out context cacher to `tractor.trionics` 2021-10-29 16:14:45 -04:00
Tyler Goodlet e4ddc794ad Error out clearing task on first quote being nan 2021-10-29 16:14:45 -04:00
Tyler Goodlet 6ed455d23d Drop throttled rate margin to 100us 2021-10-29 16:14:45 -04:00
Tyler Goodlet 1cb3fedb81 Turn on profiling for the moment 2021-10-29 16:14:45 -04:00
Tyler Goodlet a461139a85 De-densify some funcs 2021-10-29 16:14:45 -04:00
Tyler Goodlet dfc407eb39 Add some typing around web bs 2021-10-29 16:14:45 -04:00
Tyler Goodlet 67a5ff54cb Rename feed bus entrypoint 2021-10-29 16:14:45 -04:00
Tyler Goodlet 9354d0d8e2 Update some typing and add latency checks for binance 2021-10-29 16:14:45 -04:00
Tyler Goodlet 95f4b2aa02 Expect accounts as tuple, don't start rt pnl on no live pp 2021-10-29 16:14:45 -04:00
12 changed files with 95 additions and 160 deletions

View File

@ -18,30 +18,18 @@
Cacheing apis and toolz. Cacheing apis and toolz.
""" """
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
from collections import OrderedDict from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
) )
import trio from tractor.trionics import maybe_open_context
from trio_typing import TaskStatus
import tractor
from .brokers import get_brokermod from .brokers import get_brokermod
from .log import get_logger from .log import get_logger
T = TypeVar('T')
log = get_logger(__name__) log = get_logger(__name__)
@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
return decorator return decorator
_cache: dict[str, 'Client'] = {} # noqa
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))
@asynccontextmanager
async def maybe_open_ctx(
key: Hashable,
mngr: AsyncContextManager[T],
) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''
await cache.lock.acquire()
ctx_key = id(mngr)
value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value
except KeyError:
log.info(f'Allocating new resource for {key}')
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()
yield False, value
finally:
cache.users -= 1
if cache.lock.locked():
cache.lock.release()
if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')
# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()
@asynccontextmanager @asynccontextmanager
async def open_cached_client( async def open_cached_client(
brokername: str, brokername: str,
@ -190,7 +72,7 @@ async def open_cached_client(
''' '''
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
async with maybe_open_ctx( async with maybe_open_context(
key=brokername, key=brokername,
mngr=brokermod.get_client(), mngr=brokermod.get_client(),
) as (cache_hit, client): ) as (cache_hit, client):

View File

@ -21,7 +21,7 @@ Profiling wrappers for internal libs.
import time import time
from functools import wraps from functools import wraps
_pg_profile: bool = False _pg_profile: bool = True
def pg_profile_enabled() -> bool: def pg_profile_enabled() -> bool:

View File

@ -19,7 +19,7 @@ Binance backend
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import List, Dict, Any, Tuple, Union, Optional from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
import time import time
import trio import trio
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__) log = get_logger(__name__)
@ -213,7 +213,7 @@ class Client:
) )
# repack in dict form # repack in dict form
return {item[0]['symbol']: item[0] return {item[0]['symbol']: item[0]
for item in matches} for item in matches}
async def bars( async def bars(
self, self,
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
M: bool # Ignore M: bool # Ignore
async def stream_messages(ws): async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
while True: while True:
@ -487,11 +487,20 @@ async def stream_quotes(
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
feed_is_live.set() feed_is_live.set()
# import time
# last = time.time()
# start streaming # start streaming
async for typ, msg in msg_gen: async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower() topic = msg['symbol'].lower()
await send_chan.send({topic: msg}) await send_chan.send({topic: msg})
# last = time.time()
@tractor.context @tractor.context

View File

@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from math import isnan
from pprint import pformat from pprint import pformat
import time import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
@ -47,9 +48,11 @@ log = get_logger(__name__)
# TODO: numba all of this # TODO: numba all of this
def mk_check( def mk_check(
trigger_price: float, trigger_price: float,
known_last: float, known_last: float,
action: str, action: str,
) -> Callable[[float, float], bool]: ) -> Callable[[float, float], bool]:
"""Create a predicate for given ``exec_price`` based on last known """Create a predicate for given ``exec_price`` based on last known
price, ``known_last``. price, ``known_last``.
@ -77,8 +80,7 @@ def mk_check(
return check_lt return check_lt
else: raise ValueError('trigger: {trigger_price}, last: {known_last}')
return None
@dataclass @dataclass
@ -177,7 +179,15 @@ async def clear_dark_triggers(
tuple(execs.items()) tuple(execs.items())
): ):
if not pred or (ttype not in tf) or (not pred(price)): if (
not pred or
ttype not in tf or
not pred(price)
):
log.debug(
f'skipping quote for {sym} '
f'{pred}, {ttype} not in {tf}?, {pred(price)}'
)
# majority of iterations will be non-matches # majority of iterations will be non-matches
continue continue
@ -269,7 +279,7 @@ class TradesRelay:
positions: dict[str, dict[str, BrokerdPosition]] positions: dict[str, dict[str, BrokerdPosition]]
# allowed account names # allowed account names
accounts: set[str] accounts: tuple[str]
# count of connected ems clients for this ``brokerd`` # count of connected ems clients for this ``brokerd``
consumers: int = 0 consumers: int = 0
@ -414,6 +424,9 @@ async def open_brokerd_trades_dialogue(
) )
try: try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with ( async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream, brokerd_ctx.open_stream() as brokerd_trades_stream,
@ -449,7 +462,7 @@ async def open_brokerd_trades_dialogue(
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_dialogue=brokerd_trades_stream,
positions=pps, positions=pps,
accounts=set(accounts), accounts=accounts,
consumers=1, consumers=1,
) )
@ -1002,7 +1015,8 @@ async def _emsd_main(
first_quote = feed.first_quotes[symbol] first_quote = feed.first_quotes[symbol]
book = _router.get_dark_book(broker) book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote['last'] last = book.lasts[(broker, symbol)] = first_quote['last']
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
# open a stream with the brokerd backend for order # open a stream with the brokerd backend for order
# flow dialogue # flow dialogue

View File

@ -172,7 +172,6 @@ async def sample_and_broadcast(
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# TODO: ``numba`` this! # TODO: ``numba`` this!
for sym, quote in quotes.items(): for sym, quote in quotes.items():
@ -185,8 +184,12 @@ async def sample_and_broadcast(
# start writing the shm buffer with appropriate # start writing the shm buffer with appropriate
# trade data # trade data
for tick in quote['ticks']:
# TODO: we should probably not write every single
# value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive..
ticks = quote['ticks']
for tick in ticks:
ticktype = tick['type'] ticktype = tick['type']
# write trade events to shm last OHLC sample # write trade events to shm last OHLC sample
@ -258,7 +261,8 @@ async def sample_and_broadcast(
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError,
trio.EndOfChannel,
): ):
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -268,6 +272,10 @@ async def sample_and_broadcast(
f'{stream._ctx.chan.uid} dropped ' f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection' '`brokerd`-quotes-feed connection'
) )
if tick_throttle:
assert stream.closed()
# await stream.aclose()
subs.remove((stream, tick_throttle)) subs.remove((stream, tick_throttle))
@ -283,12 +291,8 @@ async def uniform_rate_send(
) -> None: ) -> None:
sleep_period = 1/rate - 0.000616 sleep_period = 1/rate - 0.0001 # 100us
last_send = time.time() last_send = time.time()
aname = stream._ctx.chan.uid[0]
fsp = False
if 'fsp' in aname:
fsp = True
while True: while True:
@ -308,20 +312,33 @@ async def uniform_rate_send(
sym, next_quote = quote_stream.receive_nowait() sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks') ticks = next_quote.get('ticks')
# XXX: idea for frame type data structure we could use on the
# wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
if ticks: if ticks:
first_quote['ticks'].extend(ticks) first_quote['ticks'].extend(ticks)
except trio.WouldBlock: except trio.WouldBlock:
now = time.time() now = time.time()
rate = 1 / (now - last_send) rate = 1 / (now - last_send)
last_send = now
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}') log.debug(
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
)
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({sym: first_quote})
last_send = now
break break
except trio.ClosedResourceError: except trio.ClosedResourceError:
# if the feed consumer goes down then drop # if the feed consumer goes down then drop

View File

@ -395,6 +395,7 @@ def open_shm_array(
# "unlink" created shm on process teardown by # "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack # pushing teardown calls onto actor context stack
tractor._actor._lifetime_stack.callback(shmarr.close) tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy) tractor._actor._lifetime_stack.callback(shmarr.destroy)

View File

@ -133,9 +133,11 @@ def mk_symbol(
def from_df( def from_df(
df: pd.DataFrame, df: pd.DataFrame,
source=None, source=None,
default_tf=None default_tf=None
) -> np.recarray: ) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``. """Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType from types import ModuleType
from typing import Any, Callable from typing import Any, Callable, AsyncGenerator
import json import json
import trio import trio
@ -127,7 +127,7 @@ async def open_autorecon_ws(
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Callable,
): ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.
""" """

View File

@ -34,11 +34,10 @@ import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
# from tractor import _broadcast
from pydantic import BaseModel from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import ( from .._daemon import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
@ -247,7 +246,7 @@ async def allocate_persistent_feed(
@tractor.context @tractor.context
async def attach_feed_bus( async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
@ -364,7 +363,7 @@ async def open_sample_step_stream(
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
async with maybe_open_ctx( async with maybe_open_context(
key=delay_s, key=delay_s,
mngr=portal.open_stream_from( mngr=portal.open_stream_from(
iter_ohlc_periods, iter_ohlc_periods,
@ -507,7 +506,7 @@ async def open_feed(
portal.open_context( portal.open_context(
attach_feed_bus, open_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
loglevel=loglevel, loglevel=loglevel,
@ -586,7 +585,7 @@ async def maybe_open_feed(
''' '''
sym = symbols[0].lower() sym = symbols[0].lower()
async with maybe_open_ctx( async with maybe_open_context(
key=(brokername, sym), key=(brokername, sym),
mngr=open_feed( mngr=open_feed(
brokername, brokername,

View File

@ -35,7 +35,7 @@ import tractor
import trio import trio
from .. import brokers from .. import brokers
from .._cacheables import maybe_open_ctx from .._cacheables import maybe_open_context
from ..trionics import async_enter_all from ..trionics import async_enter_all
from ..data.feed import open_feed, Feed from ..data.feed import open_feed, Feed
from ._chart import ( from ._chart import (
@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster(
) -> AsyncGenerator[int, dict[str, tractor.Portal]]: ) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
async with maybe_open_ctx( async with maybe_open_context(
key=uid, # for now make a cluster per client? key=uid, # for now make a cluster per client?
mngr=open_fsp_cluster( mngr=open_fsp_cluster(
workers, workers,

View File

@ -54,6 +54,7 @@ async def update_pnl_from_feed(
feed: Feed, feed: Feed,
order_mode: OrderMode, # noqa order_mode: OrderMode, # noqa
tracker: PositionTracker,
) -> None: ) -> None:
'''Real-time display the current pp's PnL in the appropriate label. '''Real-time display the current pp's PnL in the appropriate label.
@ -76,7 +77,8 @@ async def update_pnl_from_feed(
types = ('bid', 'last', 'last', 'utrade') types = ('bid', 'last', 'last', 'utrade')
else: else:
raise RuntimeError('No pp?!?!') log.info(f'No position (yet) for {tracker.alloc.account}@{key}')
return
# real-time update pnl on the status pane # real-time update pnl on the status pane
try: try:
@ -343,6 +345,7 @@ class SettingsPane:
update_pnl_from_feed, update_pnl_from_feed,
feed, feed,
mode, mode,
tracker,
) )
# immediately display in status label # immediately display in status label

View File

@ -47,7 +47,7 @@ from ._position import (
) )
from ._label import FormatLabel from ._label import FormatLabel
from ._window import MultiStatus from ._window import MultiStatus
from ..clearing._messages import Order from ..clearing._messages import Order, BrokerdPosition
from ._forms import open_form_input_handling from ._forms import open_form_input_handling
@ -529,7 +529,12 @@ async def open_order_mode(
book: OrderBook book: OrderBook
trades_stream: tractor.MsgStream trades_stream: tractor.MsgStream
position_msgs: dict
# The keys in this dict **must** be in set our set of "normalized"
# symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the
# display of a position indicator on screen.
position_msgs: dict[str, list[BrokerdPosition]]
# spawn EMS actor-service # spawn EMS actor-service
async with ( async with (
@ -563,7 +568,9 @@ async def open_order_mode(
providers=symbol.brokers providers=symbol.brokers
) )
# use only loaded accounts according to brokerd # XXX: ``brokerd`` delivers a set of account names that it allows
# use of but the user also can define the accounts they'd like
# to use, in order, in their `brokers.toml` file.
accounts = {} accounts = {}
for name in brokerd_accounts: for name in brokerd_accounts:
# ensure name is in ``brokers.toml`` # ensure name is in ``brokers.toml``
@ -620,8 +627,8 @@ async def open_order_mode(
# alloc? # alloc?
pp_tracker.update_from_pp() pp_tracker.update_from_pp()
# on existing position, show pp tracking graphics
if pp_tracker.startup_pp.size != 0: if pp_tracker.startup_pp.size != 0:
# if no position, don't show pp tracking graphics
pp_tracker.show() pp_tracker.show()
pp_tracker.hide_info() pp_tracker.hide_info()
@ -805,12 +812,13 @@ async def process_trades_and_update_ui(
tracker = mode.trackers[msg['account']] tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg) tracker.live_pp.update_from_msg(msg)
tracker.update_from_pp()
# update order pane widgets # update order pane widgets
tracker.update_from_pp()
mode.pane.update_status_ui(tracker) mode.pane.update_status_ui(tracker)
# display pnl
mode.pane.display_pnl(tracker) if tracker.live_pp.size:
# display pnl
mode.pane.display_pnl(tracker)
# short circuit to next msg to avoid # short circuit to next msg to avoid
# unnecessary msg content lookups # unnecessary msg content lookups