Compare commits
10 Commits
80e8112daa
...
70921fcb8c
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 70921fcb8c | |
Tyler Goodlet | 5ea2273cfb | |
Tyler Goodlet | e4ddc794ad | |
Tyler Goodlet | 6ed455d23d | |
Tyler Goodlet | 1cb3fedb81 | |
Tyler Goodlet | a461139a85 | |
Tyler Goodlet | dfc407eb39 | |
Tyler Goodlet | 67a5ff54cb | |
Tyler Goodlet | 9354d0d8e2 | |
Tyler Goodlet | 95f4b2aa02 |
|
@ -18,30 +18,18 @@
|
|||
Cacheing apis and toolz.
|
||||
|
||||
"""
|
||||
# further examples of interest:
|
||||
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
|
||||
|
||||
from collections import OrderedDict
|
||||
from typing import (
|
||||
Any,
|
||||
Hashable,
|
||||
Optional,
|
||||
TypeVar,
|
||||
AsyncContextManager,
|
||||
)
|
||||
from contextlib import (
|
||||
asynccontextmanager,
|
||||
)
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor.trionics import maybe_open_context
|
||||
|
||||
from .brokers import get_brokermod
|
||||
from .log import get_logger
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
|
@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
|
|||
return decorator
|
||||
|
||||
|
||||
_cache: dict[str, 'Client'] = {} # noqa
|
||||
|
||||
|
||||
class cache:
|
||||
'''Globally (processs wide) cached, task access to a
|
||||
kept-alive-while-in-use async resource.
|
||||
|
||||
'''
|
||||
lock = trio.Lock()
|
||||
users: int = 0
|
||||
values: dict[Any, Any] = {}
|
||||
resources: dict[
|
||||
int,
|
||||
Optional[tuple[trio.Nursery, trio.Event]]
|
||||
] = {}
|
||||
no_more_users: Optional[trio.Event] = None
|
||||
|
||||
@classmethod
|
||||
async def run_ctx(
|
||||
cls,
|
||||
mng,
|
||||
key,
|
||||
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
async with mng as value:
|
||||
|
||||
_, no_more_users = cls.resources[id(mng)]
|
||||
cls.values[key] = value
|
||||
task_status.started(value)
|
||||
try:
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
value = cls.values.pop(key)
|
||||
# discard nursery ref so it won't be re-used (an error)
|
||||
cls.resources.pop(id(mng))
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def maybe_open_ctx(
|
||||
|
||||
key: Hashable,
|
||||
mngr: AsyncContextManager[T],
|
||||
|
||||
) -> (bool, T):
|
||||
'''Maybe open a context manager if there is not already a cached
|
||||
version for the provided ``key``. Return the cached instance on
|
||||
a cache hit.
|
||||
|
||||
'''
|
||||
|
||||
await cache.lock.acquire()
|
||||
|
||||
ctx_key = id(mngr)
|
||||
|
||||
value = None
|
||||
try:
|
||||
# lock feed acquisition around task racing / ``trio``'s
|
||||
# scheduler protocol
|
||||
value = cache.values[key]
|
||||
log.info(f'Reusing cached resource for {key}')
|
||||
cache.users += 1
|
||||
cache.lock.release()
|
||||
yield True, value
|
||||
|
||||
except KeyError:
|
||||
log.info(f'Allocating new resource for {key}')
|
||||
|
||||
# **critical section** that should prevent other tasks from
|
||||
# checking the cache until complete otherwise the scheduler
|
||||
# may switch and by accident we create more then one feed.
|
||||
|
||||
# TODO: avoid pulling from ``tractor`` internals and
|
||||
# instead offer a "root nursery" in piker actors?
|
||||
service_n = tractor.current_actor()._service_n
|
||||
|
||||
# TODO: does this need to be a tractor "root nursery"?
|
||||
ln = cache.resources.get(ctx_key)
|
||||
assert not ln
|
||||
|
||||
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
|
||||
|
||||
value = await ln.start(cache.run_ctx, mngr, key)
|
||||
cache.users += 1
|
||||
cache.lock.release()
|
||||
|
||||
yield False, value
|
||||
|
||||
finally:
|
||||
cache.users -= 1
|
||||
|
||||
if cache.lock.locked():
|
||||
cache.lock.release()
|
||||
|
||||
if value is not None:
|
||||
# if no more consumers, teardown the client
|
||||
if cache.users <= 0:
|
||||
log.warning(f'De-allocating resource for {key}')
|
||||
|
||||
# terminate mngr nursery
|
||||
entry = cache.resources.get(ctx_key)
|
||||
if entry:
|
||||
_, no_more_users = entry
|
||||
no_more_users.set()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_cached_client(
|
||||
brokername: str,
|
||||
|
@ -190,7 +72,7 @@ async def open_cached_client(
|
|||
|
||||
'''
|
||||
brokermod = get_brokermod(brokername)
|
||||
async with maybe_open_ctx(
|
||||
async with maybe_open_context(
|
||||
key=brokername,
|
||||
mngr=brokermod.get_client(),
|
||||
) as (cache_hit, client):
|
||||
|
|
|
@ -21,7 +21,7 @@ Profiling wrappers for internal libs.
|
|||
import time
|
||||
from functools import wraps
|
||||
|
||||
_pg_profile: bool = False
|
||||
_pg_profile: bool = True
|
||||
|
||||
|
||||
def pg_profile_enabled() -> bool:
|
||||
|
|
|
@ -19,7 +19,7 @@ Binance backend
|
|||
|
||||
"""
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import List, Dict, Any, Tuple, Union, Optional
|
||||
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
|
||||
import time
|
||||
|
||||
import trio
|
||||
|
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
|
|||
from ._util import resproc, SymbolNotFound
|
||||
from ..log import get_logger, get_console_log
|
||||
from ..data import ShmArray
|
||||
from ..data._web_bs import open_autorecon_ws
|
||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
|
|||
M: bool # Ignore
|
||||
|
||||
|
||||
async def stream_messages(ws):
|
||||
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
|
||||
|
||||
timeouts = 0
|
||||
while True:
|
||||
|
@ -487,11 +487,20 @@ async def stream_quotes(
|
|||
# signal to caller feed is ready for consumption
|
||||
feed_is_live.set()
|
||||
|
||||
# import time
|
||||
# last = time.time()
|
||||
|
||||
# start streaming
|
||||
async for typ, msg in msg_gen:
|
||||
|
||||
# period = time.time() - last
|
||||
# hz = 1/period if period else float('inf')
|
||||
# if hz > 60:
|
||||
# log.info(f'Binance quotez : {hz}')
|
||||
|
||||
topic = msg['symbol'].lower()
|
||||
await send_chan.send({topic: msg})
|
||||
# last = time.time()
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
|
@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
|
|||
"""
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from math import isnan
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import AsyncIterator, Callable
|
||||
|
@ -47,9 +48,11 @@ log = get_logger(__name__)
|
|||
|
||||
# TODO: numba all of this
|
||||
def mk_check(
|
||||
|
||||
trigger_price: float,
|
||||
known_last: float,
|
||||
action: str,
|
||||
|
||||
) -> Callable[[float, float], bool]:
|
||||
"""Create a predicate for given ``exec_price`` based on last known
|
||||
price, ``known_last``.
|
||||
|
@ -77,8 +80,7 @@ def mk_check(
|
|||
|
||||
return check_lt
|
||||
|
||||
else:
|
||||
return None
|
||||
raise ValueError('trigger: {trigger_price}, last: {known_last}')
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -177,7 +179,15 @@ async def clear_dark_triggers(
|
|||
tuple(execs.items())
|
||||
):
|
||||
|
||||
if not pred or (ttype not in tf) or (not pred(price)):
|
||||
if (
|
||||
not pred or
|
||||
ttype not in tf or
|
||||
not pred(price)
|
||||
):
|
||||
log.debug(
|
||||
f'skipping quote for {sym} '
|
||||
f'{pred}, {ttype} not in {tf}?, {pred(price)}'
|
||||
)
|
||||
# majority of iterations will be non-matches
|
||||
continue
|
||||
|
||||
|
@ -269,7 +279,7 @@ class TradesRelay:
|
|||
positions: dict[str, dict[str, BrokerdPosition]]
|
||||
|
||||
# allowed account names
|
||||
accounts: set[str]
|
||||
accounts: tuple[str]
|
||||
|
||||
# count of connected ems clients for this ``brokerd``
|
||||
consumers: int = 0
|
||||
|
@ -414,6 +424,9 @@ async def open_brokerd_trades_dialogue(
|
|||
)
|
||||
|
||||
try:
|
||||
positions: list[BrokerdPosition]
|
||||
accounts: tuple[str]
|
||||
|
||||
async with (
|
||||
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
|
||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||
|
@ -449,7 +462,7 @@ async def open_brokerd_trades_dialogue(
|
|||
relay = TradesRelay(
|
||||
brokerd_dialogue=brokerd_trades_stream,
|
||||
positions=pps,
|
||||
accounts=set(accounts),
|
||||
accounts=accounts,
|
||||
consumers=1,
|
||||
)
|
||||
|
||||
|
@ -1002,7 +1015,8 @@ async def _emsd_main(
|
|||
first_quote = feed.first_quotes[symbol]
|
||||
|
||||
book = _router.get_dark_book(broker)
|
||||
book.lasts[(broker, symbol)] = first_quote['last']
|
||||
last = book.lasts[(broker, symbol)] = first_quote['last']
|
||||
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
|
||||
|
||||
# open a stream with the brokerd backend for order
|
||||
# flow dialogue
|
||||
|
|
|
@ -172,7 +172,6 @@ async def sample_and_broadcast(
|
|||
|
||||
# iterate stream delivered by broker
|
||||
async for quotes in quote_stream:
|
||||
|
||||
# TODO: ``numba`` this!
|
||||
for sym, quote in quotes.items():
|
||||
|
||||
|
@ -185,8 +184,12 @@ async def sample_and_broadcast(
|
|||
|
||||
# start writing the shm buffer with appropriate
|
||||
# trade data
|
||||
for tick in quote['ticks']:
|
||||
|
||||
# TODO: we should probably not write every single
|
||||
# value to an OHLC sample stream XD
|
||||
# for a tick stream sure.. but this is excessive..
|
||||
ticks = quote['ticks']
|
||||
for tick in ticks:
|
||||
ticktype = tick['type']
|
||||
|
||||
# write trade events to shm last OHLC sample
|
||||
|
@ -258,7 +261,8 @@ async def sample_and_broadcast(
|
|||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
# XXX: do we need to deregister here
|
||||
# if it's done in the fee bus code?
|
||||
|
@ -268,6 +272,10 @@ async def sample_and_broadcast(
|
|||
f'{stream._ctx.chan.uid} dropped '
|
||||
'`brokerd`-quotes-feed connection'
|
||||
)
|
||||
if tick_throttle:
|
||||
assert stream.closed()
|
||||
# await stream.aclose()
|
||||
|
||||
subs.remove((stream, tick_throttle))
|
||||
|
||||
|
||||
|
@ -283,12 +291,8 @@ async def uniform_rate_send(
|
|||
|
||||
) -> None:
|
||||
|
||||
sleep_period = 1/rate - 0.000616
|
||||
sleep_period = 1/rate - 0.0001 # 100us
|
||||
last_send = time.time()
|
||||
aname = stream._ctx.chan.uid[0]
|
||||
fsp = False
|
||||
if 'fsp' in aname:
|
||||
fsp = True
|
||||
|
||||
while True:
|
||||
|
||||
|
@ -308,20 +312,33 @@ async def uniform_rate_send(
|
|||
sym, next_quote = quote_stream.receive_nowait()
|
||||
ticks = next_quote.get('ticks')
|
||||
|
||||
# XXX: idea for frame type data structure we could use on the
|
||||
# wire instead of a simple list?
|
||||
# frames = {
|
||||
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
|
||||
|
||||
# 'type_a': [tick0, tick1, tick2, .., tickn],
|
||||
# 'type_b': [tick0, tick1, tick2, .., tickn],
|
||||
# 'type_c': [tick0, tick1, tick2, .., tickn],
|
||||
# ...
|
||||
# 'type_n': [tick0, tick1, tick2, .., tickn],
|
||||
# }
|
||||
if ticks:
|
||||
first_quote['ticks'].extend(ticks)
|
||||
|
||||
except trio.WouldBlock:
|
||||
now = time.time()
|
||||
rate = 1 / (now - last_send)
|
||||
last_send = now
|
||||
|
||||
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
|
||||
log.debug(
|
||||
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
|
||||
)
|
||||
|
||||
# TODO: now if only we could sync this to the display
|
||||
# rate timing exactly lul
|
||||
try:
|
||||
await stream.send({sym: first_quote})
|
||||
last_send = now
|
||||
break
|
||||
except trio.ClosedResourceError:
|
||||
# if the feed consumer goes down then drop
|
||||
|
|
|
@ -395,6 +395,7 @@ def open_shm_array(
|
|||
|
||||
# "unlink" created shm on process teardown by
|
||||
# pushing teardown calls onto actor context stack
|
||||
|
||||
tractor._actor._lifetime_stack.callback(shmarr.close)
|
||||
tractor._actor._lifetime_stack.callback(shmarr.destroy)
|
||||
|
||||
|
|
|
@ -133,9 +133,11 @@ def mk_symbol(
|
|||
|
||||
|
||||
def from_df(
|
||||
|
||||
df: pd.DataFrame,
|
||||
source=None,
|
||||
default_tf=None
|
||||
|
||||
) -> np.recarray:
|
||||
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
|
|||
"""
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
from types import ModuleType
|
||||
from typing import Any, Callable
|
||||
from typing import Any, Callable, AsyncGenerator
|
||||
import json
|
||||
|
||||
import trio
|
||||
|
@ -127,7 +127,7 @@ async def open_autorecon_ws(
|
|||
|
||||
# TODO: proper type annot smh
|
||||
fixture: Callable,
|
||||
):
|
||||
) -> AsyncGenerator[tuple[...], NoBsWs]:
|
||||
"""Apparently we can QoS for all sorts of reasons..so catch em.
|
||||
|
||||
"""
|
||||
|
|
|
@ -34,11 +34,10 @@ import trio
|
|||
from trio.abc import ReceiveChannel
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
# from tractor import _broadcast
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..brokers import get_brokermod
|
||||
from .._cacheables import maybe_open_ctx
|
||||
from .._cacheables import maybe_open_context
|
||||
from ..log import get_logger, get_console_log
|
||||
from .._daemon import (
|
||||
maybe_spawn_brokerd,
|
||||
|
@ -247,7 +246,7 @@ async def allocate_persistent_feed(
|
|||
|
||||
|
||||
@tractor.context
|
||||
async def attach_feed_bus(
|
||||
async def open_feed_bus(
|
||||
|
||||
ctx: tractor.Context,
|
||||
brokername: str,
|
||||
|
@ -364,7 +363,7 @@ async def open_sample_step_stream(
|
|||
# XXX: this should be singleton on a host,
|
||||
# a lone broker-daemon per provider should be
|
||||
# created for all practical purposes
|
||||
async with maybe_open_ctx(
|
||||
async with maybe_open_context(
|
||||
key=delay_s,
|
||||
mngr=portal.open_stream_from(
|
||||
iter_ohlc_periods,
|
||||
|
@ -507,7 +506,7 @@ async def open_feed(
|
|||
|
||||
portal.open_context(
|
||||
|
||||
attach_feed_bus,
|
||||
open_feed_bus,
|
||||
brokername=brokername,
|
||||
symbol=sym,
|
||||
loglevel=loglevel,
|
||||
|
@ -586,7 +585,7 @@ async def maybe_open_feed(
|
|||
'''
|
||||
sym = symbols[0].lower()
|
||||
|
||||
async with maybe_open_ctx(
|
||||
async with maybe_open_context(
|
||||
key=(brokername, sym),
|
||||
mngr=open_feed(
|
||||
brokername,
|
||||
|
|
|
@ -35,7 +35,7 @@ import tractor
|
|||
import trio
|
||||
|
||||
from .. import brokers
|
||||
from .._cacheables import maybe_open_ctx
|
||||
from .._cacheables import maybe_open_context
|
||||
from ..trionics import async_enter_all
|
||||
from ..data.feed import open_feed, Feed
|
||||
from ._chart import (
|
||||
|
@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster(
|
|||
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
|
||||
|
||||
uid = tractor.current_actor().uid
|
||||
async with maybe_open_ctx(
|
||||
async with maybe_open_context(
|
||||
key=uid, # for now make a cluster per client?
|
||||
mngr=open_fsp_cluster(
|
||||
workers,
|
||||
|
|
|
@ -54,6 +54,7 @@ async def update_pnl_from_feed(
|
|||
|
||||
feed: Feed,
|
||||
order_mode: OrderMode, # noqa
|
||||
tracker: PositionTracker,
|
||||
|
||||
) -> None:
|
||||
'''Real-time display the current pp's PnL in the appropriate label.
|
||||
|
@ -76,7 +77,8 @@ async def update_pnl_from_feed(
|
|||
types = ('bid', 'last', 'last', 'utrade')
|
||||
|
||||
else:
|
||||
raise RuntimeError('No pp?!?!')
|
||||
log.info(f'No position (yet) for {tracker.alloc.account}@{key}')
|
||||
return
|
||||
|
||||
# real-time update pnl on the status pane
|
||||
try:
|
||||
|
@ -343,6 +345,7 @@ class SettingsPane:
|
|||
update_pnl_from_feed,
|
||||
feed,
|
||||
mode,
|
||||
tracker,
|
||||
)
|
||||
|
||||
# immediately display in status label
|
||||
|
|
|
@ -47,7 +47,7 @@ from ._position import (
|
|||
)
|
||||
from ._label import FormatLabel
|
||||
from ._window import MultiStatus
|
||||
from ..clearing._messages import Order
|
||||
from ..clearing._messages import Order, BrokerdPosition
|
||||
from ._forms import open_form_input_handling
|
||||
|
||||
|
||||
|
@ -529,7 +529,12 @@ async def open_order_mode(
|
|||
|
||||
book: OrderBook
|
||||
trades_stream: tractor.MsgStream
|
||||
position_msgs: dict
|
||||
|
||||
# The keys in this dict **must** be in set our set of "normalized"
|
||||
# symbol names (i.e. the same names you'd get back in search
|
||||
# results) in order for position msgs to correctly trigger the
|
||||
# display of a position indicator on screen.
|
||||
position_msgs: dict[str, list[BrokerdPosition]]
|
||||
|
||||
# spawn EMS actor-service
|
||||
async with (
|
||||
|
@ -563,7 +568,9 @@ async def open_order_mode(
|
|||
providers=symbol.brokers
|
||||
)
|
||||
|
||||
# use only loaded accounts according to brokerd
|
||||
# XXX: ``brokerd`` delivers a set of account names that it allows
|
||||
# use of but the user also can define the accounts they'd like
|
||||
# to use, in order, in their `brokers.toml` file.
|
||||
accounts = {}
|
||||
for name in brokerd_accounts:
|
||||
# ensure name is in ``brokers.toml``
|
||||
|
@ -620,8 +627,8 @@ async def open_order_mode(
|
|||
# alloc?
|
||||
pp_tracker.update_from_pp()
|
||||
|
||||
# on existing position, show pp tracking graphics
|
||||
if pp_tracker.startup_pp.size != 0:
|
||||
# if no position, don't show pp tracking graphics
|
||||
pp_tracker.show()
|
||||
pp_tracker.hide_info()
|
||||
|
||||
|
@ -805,10 +812,11 @@ async def process_trades_and_update_ui(
|
|||
|
||||
tracker = mode.trackers[msg['account']]
|
||||
tracker.live_pp.update_from_msg(msg)
|
||||
tracker.update_from_pp()
|
||||
|
||||
# update order pane widgets
|
||||
tracker.update_from_pp()
|
||||
mode.pane.update_status_ui(tracker)
|
||||
|
||||
if tracker.live_pp.size:
|
||||
# display pnl
|
||||
mode.pane.display_pnl(tracker)
|
||||
|
||||
|
|
Loading…
Reference in New Issue