Compare commits
10 Commits
80e8112daa
...
70921fcb8c
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 70921fcb8c | |
Tyler Goodlet | 5ea2273cfb | |
Tyler Goodlet | e4ddc794ad | |
Tyler Goodlet | 6ed455d23d | |
Tyler Goodlet | 1cb3fedb81 | |
Tyler Goodlet | a461139a85 | |
Tyler Goodlet | dfc407eb39 | |
Tyler Goodlet | 67a5ff54cb | |
Tyler Goodlet | 9354d0d8e2 | |
Tyler Goodlet | 95f4b2aa02 |
|
@ -18,30 +18,18 @@
|
||||||
Cacheing apis and toolz.
|
Cacheing apis and toolz.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# further examples of interest:
|
|
||||||
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
|
|
||||||
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from typing import (
|
|
||||||
Any,
|
|
||||||
Hashable,
|
|
||||||
Optional,
|
|
||||||
TypeVar,
|
|
||||||
AsyncContextManager,
|
|
||||||
)
|
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager,
|
asynccontextmanager,
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
from tractor.trionics import maybe_open_context
|
||||||
from trio_typing import TaskStatus
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
from .brokers import get_brokermod
|
from .brokers import get_brokermod
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
|
||||||
|
|
||||||
T = TypeVar('T')
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
_cache: dict[str, 'Client'] = {} # noqa
|
|
||||||
|
|
||||||
|
|
||||||
class cache:
|
|
||||||
'''Globally (processs wide) cached, task access to a
|
|
||||||
kept-alive-while-in-use async resource.
|
|
||||||
|
|
||||||
'''
|
|
||||||
lock = trio.Lock()
|
|
||||||
users: int = 0
|
|
||||||
values: dict[Any, Any] = {}
|
|
||||||
resources: dict[
|
|
||||||
int,
|
|
||||||
Optional[tuple[trio.Nursery, trio.Event]]
|
|
||||||
] = {}
|
|
||||||
no_more_users: Optional[trio.Event] = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def run_ctx(
|
|
||||||
cls,
|
|
||||||
mng,
|
|
||||||
key,
|
|
||||||
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
async with mng as value:
|
|
||||||
|
|
||||||
_, no_more_users = cls.resources[id(mng)]
|
|
||||||
cls.values[key] = value
|
|
||||||
task_status.started(value)
|
|
||||||
try:
|
|
||||||
await no_more_users.wait()
|
|
||||||
finally:
|
|
||||||
value = cls.values.pop(key)
|
|
||||||
# discard nursery ref so it won't be re-used (an error)
|
|
||||||
cls.resources.pop(id(mng))
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def maybe_open_ctx(
|
|
||||||
|
|
||||||
key: Hashable,
|
|
||||||
mngr: AsyncContextManager[T],
|
|
||||||
|
|
||||||
) -> (bool, T):
|
|
||||||
'''Maybe open a context manager if there is not already a cached
|
|
||||||
version for the provided ``key``. Return the cached instance on
|
|
||||||
a cache hit.
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
await cache.lock.acquire()
|
|
||||||
|
|
||||||
ctx_key = id(mngr)
|
|
||||||
|
|
||||||
value = None
|
|
||||||
try:
|
|
||||||
# lock feed acquisition around task racing / ``trio``'s
|
|
||||||
# scheduler protocol
|
|
||||||
value = cache.values[key]
|
|
||||||
log.info(f'Reusing cached resource for {key}')
|
|
||||||
cache.users += 1
|
|
||||||
cache.lock.release()
|
|
||||||
yield True, value
|
|
||||||
|
|
||||||
except KeyError:
|
|
||||||
log.info(f'Allocating new resource for {key}')
|
|
||||||
|
|
||||||
# **critical section** that should prevent other tasks from
|
|
||||||
# checking the cache until complete otherwise the scheduler
|
|
||||||
# may switch and by accident we create more then one feed.
|
|
||||||
|
|
||||||
# TODO: avoid pulling from ``tractor`` internals and
|
|
||||||
# instead offer a "root nursery" in piker actors?
|
|
||||||
service_n = tractor.current_actor()._service_n
|
|
||||||
|
|
||||||
# TODO: does this need to be a tractor "root nursery"?
|
|
||||||
ln = cache.resources.get(ctx_key)
|
|
||||||
assert not ln
|
|
||||||
|
|
||||||
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
|
|
||||||
|
|
||||||
value = await ln.start(cache.run_ctx, mngr, key)
|
|
||||||
cache.users += 1
|
|
||||||
cache.lock.release()
|
|
||||||
|
|
||||||
yield False, value
|
|
||||||
|
|
||||||
finally:
|
|
||||||
cache.users -= 1
|
|
||||||
|
|
||||||
if cache.lock.locked():
|
|
||||||
cache.lock.release()
|
|
||||||
|
|
||||||
if value is not None:
|
|
||||||
# if no more consumers, teardown the client
|
|
||||||
if cache.users <= 0:
|
|
||||||
log.warning(f'De-allocating resource for {key}')
|
|
||||||
|
|
||||||
# terminate mngr nursery
|
|
||||||
entry = cache.resources.get(ctx_key)
|
|
||||||
if entry:
|
|
||||||
_, no_more_users = entry
|
|
||||||
no_more_users.set()
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_cached_client(
|
async def open_cached_client(
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
@ -190,7 +72,7 @@ async def open_cached_client(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
brokermod = get_brokermod(brokername)
|
brokermod = get_brokermod(brokername)
|
||||||
async with maybe_open_ctx(
|
async with maybe_open_context(
|
||||||
key=brokername,
|
key=brokername,
|
||||||
mngr=brokermod.get_client(),
|
mngr=brokermod.get_client(),
|
||||||
) as (cache_hit, client):
|
) as (cache_hit, client):
|
||||||
|
|
|
@ -21,7 +21,7 @@ Profiling wrappers for internal libs.
|
||||||
import time
|
import time
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
_pg_profile: bool = False
|
_pg_profile: bool = True
|
||||||
|
|
||||||
|
|
||||||
def pg_profile_enabled() -> bool:
|
def pg_profile_enabled() -> bool:
|
||||||
|
|
|
@ -19,7 +19,7 @@ Binance backend
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import List, Dict, Any, Tuple, Union, Optional
|
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
|
||||||
from ._util import resproc, SymbolNotFound
|
from ._util import resproc, SymbolNotFound
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from ..data import ShmArray
|
from ..data import ShmArray
|
||||||
from ..data._web_bs import open_autorecon_ws
|
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -213,7 +213,7 @@ class Client:
|
||||||
)
|
)
|
||||||
# repack in dict form
|
# repack in dict form
|
||||||
return {item[0]['symbol']: item[0]
|
return {item[0]['symbol']: item[0]
|
||||||
for item in matches}
|
for item in matches}
|
||||||
|
|
||||||
async def bars(
|
async def bars(
|
||||||
self,
|
self,
|
||||||
|
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
|
||||||
M: bool # Ignore
|
M: bool # Ignore
|
||||||
|
|
||||||
|
|
||||||
async def stream_messages(ws):
|
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
|
||||||
|
|
||||||
timeouts = 0
|
timeouts = 0
|
||||||
while True:
|
while True:
|
||||||
|
@ -487,11 +487,20 @@ async def stream_quotes(
|
||||||
# signal to caller feed is ready for consumption
|
# signal to caller feed is ready for consumption
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
||||||
|
# import time
|
||||||
|
# last = time.time()
|
||||||
|
|
||||||
# start streaming
|
# start streaming
|
||||||
async for typ, msg in msg_gen:
|
async for typ, msg in msg_gen:
|
||||||
|
|
||||||
|
# period = time.time() - last
|
||||||
|
# hz = 1/period if period else float('inf')
|
||||||
|
# if hz > 60:
|
||||||
|
# log.info(f'Binance quotez : {hz}')
|
||||||
|
|
||||||
topic = msg['symbol'].lower()
|
topic = msg['symbol'].lower()
|
||||||
await send_chan.send({topic: msg})
|
await send_chan.send({topic: msg})
|
||||||
|
# last = time.time()
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from math import isnan
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import time
|
import time
|
||||||
from typing import AsyncIterator, Callable
|
from typing import AsyncIterator, Callable
|
||||||
|
@ -47,9 +48,11 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
# TODO: numba all of this
|
# TODO: numba all of this
|
||||||
def mk_check(
|
def mk_check(
|
||||||
|
|
||||||
trigger_price: float,
|
trigger_price: float,
|
||||||
known_last: float,
|
known_last: float,
|
||||||
action: str,
|
action: str,
|
||||||
|
|
||||||
) -> Callable[[float, float], bool]:
|
) -> Callable[[float, float], bool]:
|
||||||
"""Create a predicate for given ``exec_price`` based on last known
|
"""Create a predicate for given ``exec_price`` based on last known
|
||||||
price, ``known_last``.
|
price, ``known_last``.
|
||||||
|
@ -77,8 +80,7 @@ def mk_check(
|
||||||
|
|
||||||
return check_lt
|
return check_lt
|
||||||
|
|
||||||
else:
|
raise ValueError('trigger: {trigger_price}, last: {known_last}')
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
@ -177,7 +179,15 @@ async def clear_dark_triggers(
|
||||||
tuple(execs.items())
|
tuple(execs.items())
|
||||||
):
|
):
|
||||||
|
|
||||||
if not pred or (ttype not in tf) or (not pred(price)):
|
if (
|
||||||
|
not pred or
|
||||||
|
ttype not in tf or
|
||||||
|
not pred(price)
|
||||||
|
):
|
||||||
|
log.debug(
|
||||||
|
f'skipping quote for {sym} '
|
||||||
|
f'{pred}, {ttype} not in {tf}?, {pred(price)}'
|
||||||
|
)
|
||||||
# majority of iterations will be non-matches
|
# majority of iterations will be non-matches
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -269,7 +279,7 @@ class TradesRelay:
|
||||||
positions: dict[str, dict[str, BrokerdPosition]]
|
positions: dict[str, dict[str, BrokerdPosition]]
|
||||||
|
|
||||||
# allowed account names
|
# allowed account names
|
||||||
accounts: set[str]
|
accounts: tuple[str]
|
||||||
|
|
||||||
# count of connected ems clients for this ``brokerd``
|
# count of connected ems clients for this ``brokerd``
|
||||||
consumers: int = 0
|
consumers: int = 0
|
||||||
|
@ -414,6 +424,9 @@ async def open_brokerd_trades_dialogue(
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
positions: list[BrokerdPosition]
|
||||||
|
accounts: tuple[str]
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
|
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
|
||||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||||
|
@ -449,7 +462,7 @@ async def open_brokerd_trades_dialogue(
|
||||||
relay = TradesRelay(
|
relay = TradesRelay(
|
||||||
brokerd_dialogue=brokerd_trades_stream,
|
brokerd_dialogue=brokerd_trades_stream,
|
||||||
positions=pps,
|
positions=pps,
|
||||||
accounts=set(accounts),
|
accounts=accounts,
|
||||||
consumers=1,
|
consumers=1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1002,7 +1015,8 @@ async def _emsd_main(
|
||||||
first_quote = feed.first_quotes[symbol]
|
first_quote = feed.first_quotes[symbol]
|
||||||
|
|
||||||
book = _router.get_dark_book(broker)
|
book = _router.get_dark_book(broker)
|
||||||
book.lasts[(broker, symbol)] = first_quote['last']
|
last = book.lasts[(broker, symbol)] = first_quote['last']
|
||||||
|
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
|
||||||
|
|
||||||
# open a stream with the brokerd backend for order
|
# open a stream with the brokerd backend for order
|
||||||
# flow dialogue
|
# flow dialogue
|
||||||
|
|
|
@ -172,7 +172,6 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
# iterate stream delivered by broker
|
# iterate stream delivered by broker
|
||||||
async for quotes in quote_stream:
|
async for quotes in quote_stream:
|
||||||
|
|
||||||
# TODO: ``numba`` this!
|
# TODO: ``numba`` this!
|
||||||
for sym, quote in quotes.items():
|
for sym, quote in quotes.items():
|
||||||
|
|
||||||
|
@ -185,8 +184,12 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
# start writing the shm buffer with appropriate
|
# start writing the shm buffer with appropriate
|
||||||
# trade data
|
# trade data
|
||||||
for tick in quote['ticks']:
|
|
||||||
|
|
||||||
|
# TODO: we should probably not write every single
|
||||||
|
# value to an OHLC sample stream XD
|
||||||
|
# for a tick stream sure.. but this is excessive..
|
||||||
|
ticks = quote['ticks']
|
||||||
|
for tick in ticks:
|
||||||
ticktype = tick['type']
|
ticktype = tick['type']
|
||||||
|
|
||||||
# write trade events to shm last OHLC sample
|
# write trade events to shm last OHLC sample
|
||||||
|
@ -258,7 +261,8 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError,
|
||||||
|
trio.EndOfChannel,
|
||||||
):
|
):
|
||||||
# XXX: do we need to deregister here
|
# XXX: do we need to deregister here
|
||||||
# if it's done in the fee bus code?
|
# if it's done in the fee bus code?
|
||||||
|
@ -268,6 +272,10 @@ async def sample_and_broadcast(
|
||||||
f'{stream._ctx.chan.uid} dropped '
|
f'{stream._ctx.chan.uid} dropped '
|
||||||
'`brokerd`-quotes-feed connection'
|
'`brokerd`-quotes-feed connection'
|
||||||
)
|
)
|
||||||
|
if tick_throttle:
|
||||||
|
assert stream.closed()
|
||||||
|
# await stream.aclose()
|
||||||
|
|
||||||
subs.remove((stream, tick_throttle))
|
subs.remove((stream, tick_throttle))
|
||||||
|
|
||||||
|
|
||||||
|
@ -283,12 +291,8 @@ async def uniform_rate_send(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
sleep_period = 1/rate - 0.000616
|
sleep_period = 1/rate - 0.0001 # 100us
|
||||||
last_send = time.time()
|
last_send = time.time()
|
||||||
aname = stream._ctx.chan.uid[0]
|
|
||||||
fsp = False
|
|
||||||
if 'fsp' in aname:
|
|
||||||
fsp = True
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
|
@ -308,20 +312,33 @@ async def uniform_rate_send(
|
||||||
sym, next_quote = quote_stream.receive_nowait()
|
sym, next_quote = quote_stream.receive_nowait()
|
||||||
ticks = next_quote.get('ticks')
|
ticks = next_quote.get('ticks')
|
||||||
|
|
||||||
|
# XXX: idea for frame type data structure we could use on the
|
||||||
|
# wire instead of a simple list?
|
||||||
|
# frames = {
|
||||||
|
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
|
||||||
|
|
||||||
|
# 'type_a': [tick0, tick1, tick2, .., tickn],
|
||||||
|
# 'type_b': [tick0, tick1, tick2, .., tickn],
|
||||||
|
# 'type_c': [tick0, tick1, tick2, .., tickn],
|
||||||
|
# ...
|
||||||
|
# 'type_n': [tick0, tick1, tick2, .., tickn],
|
||||||
|
# }
|
||||||
if ticks:
|
if ticks:
|
||||||
first_quote['ticks'].extend(ticks)
|
first_quote['ticks'].extend(ticks)
|
||||||
|
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
rate = 1 / (now - last_send)
|
rate = 1 / (now - last_send)
|
||||||
last_send = now
|
|
||||||
|
|
||||||
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
|
log.debug(
|
||||||
|
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: now if only we could sync this to the display
|
# TODO: now if only we could sync this to the display
|
||||||
# rate timing exactly lul
|
# rate timing exactly lul
|
||||||
try:
|
try:
|
||||||
await stream.send({sym: first_quote})
|
await stream.send({sym: first_quote})
|
||||||
|
last_send = now
|
||||||
break
|
break
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
# if the feed consumer goes down then drop
|
# if the feed consumer goes down then drop
|
||||||
|
|
|
@ -395,6 +395,7 @@ def open_shm_array(
|
||||||
|
|
||||||
# "unlink" created shm on process teardown by
|
# "unlink" created shm on process teardown by
|
||||||
# pushing teardown calls onto actor context stack
|
# pushing teardown calls onto actor context stack
|
||||||
|
|
||||||
tractor._actor._lifetime_stack.callback(shmarr.close)
|
tractor._actor._lifetime_stack.callback(shmarr.close)
|
||||||
tractor._actor._lifetime_stack.callback(shmarr.destroy)
|
tractor._actor._lifetime_stack.callback(shmarr.destroy)
|
||||||
|
|
||||||
|
|
|
@ -133,9 +133,11 @@ def mk_symbol(
|
||||||
|
|
||||||
|
|
||||||
def from_df(
|
def from_df(
|
||||||
|
|
||||||
df: pd.DataFrame,
|
df: pd.DataFrame,
|
||||||
source=None,
|
source=None,
|
||||||
default_tf=None
|
default_tf=None
|
||||||
|
|
||||||
) -> np.recarray:
|
) -> np.recarray:
|
||||||
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
|
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager, AsyncExitStack
|
from contextlib import asynccontextmanager, AsyncExitStack
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable, AsyncGenerator
|
||||||
import json
|
import json
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
@ -127,7 +127,7 @@ async def open_autorecon_ws(
|
||||||
|
|
||||||
# TODO: proper type annot smh
|
# TODO: proper type annot smh
|
||||||
fixture: Callable,
|
fixture: Callable,
|
||||||
):
|
) -> AsyncGenerator[tuple[...], NoBsWs]:
|
||||||
"""Apparently we can QoS for all sorts of reasons..so catch em.
|
"""Apparently we can QoS for all sorts of reasons..so catch em.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -34,11 +34,10 @@ import trio
|
||||||
from trio.abc import ReceiveChannel
|
from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
# from tractor import _broadcast
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from ..brokers import get_brokermod
|
from ..brokers import get_brokermod
|
||||||
from .._cacheables import maybe_open_ctx
|
from .._cacheables import maybe_open_context
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from .._daemon import (
|
from .._daemon import (
|
||||||
maybe_spawn_brokerd,
|
maybe_spawn_brokerd,
|
||||||
|
@ -247,7 +246,7 @@ async def allocate_persistent_feed(
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def attach_feed_bus(
|
async def open_feed_bus(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
@ -364,7 +363,7 @@ async def open_sample_step_stream(
|
||||||
# XXX: this should be singleton on a host,
|
# XXX: this should be singleton on a host,
|
||||||
# a lone broker-daemon per provider should be
|
# a lone broker-daemon per provider should be
|
||||||
# created for all practical purposes
|
# created for all practical purposes
|
||||||
async with maybe_open_ctx(
|
async with maybe_open_context(
|
||||||
key=delay_s,
|
key=delay_s,
|
||||||
mngr=portal.open_stream_from(
|
mngr=portal.open_stream_from(
|
||||||
iter_ohlc_periods,
|
iter_ohlc_periods,
|
||||||
|
@ -507,7 +506,7 @@ async def open_feed(
|
||||||
|
|
||||||
portal.open_context(
|
portal.open_context(
|
||||||
|
|
||||||
attach_feed_bus,
|
open_feed_bus,
|
||||||
brokername=brokername,
|
brokername=brokername,
|
||||||
symbol=sym,
|
symbol=sym,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
@ -586,7 +585,7 @@ async def maybe_open_feed(
|
||||||
'''
|
'''
|
||||||
sym = symbols[0].lower()
|
sym = symbols[0].lower()
|
||||||
|
|
||||||
async with maybe_open_ctx(
|
async with maybe_open_context(
|
||||||
key=(brokername, sym),
|
key=(brokername, sym),
|
||||||
mngr=open_feed(
|
mngr=open_feed(
|
||||||
brokername,
|
brokername,
|
||||||
|
|
|
@ -35,7 +35,7 @@ import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from .. import brokers
|
from .. import brokers
|
||||||
from .._cacheables import maybe_open_ctx
|
from .._cacheables import maybe_open_context
|
||||||
from ..trionics import async_enter_all
|
from ..trionics import async_enter_all
|
||||||
from ..data.feed import open_feed, Feed
|
from ..data.feed import open_feed, Feed
|
||||||
from ._chart import (
|
from ._chart import (
|
||||||
|
@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster(
|
||||||
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
|
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
|
||||||
|
|
||||||
uid = tractor.current_actor().uid
|
uid = tractor.current_actor().uid
|
||||||
async with maybe_open_ctx(
|
async with maybe_open_context(
|
||||||
key=uid, # for now make a cluster per client?
|
key=uid, # for now make a cluster per client?
|
||||||
mngr=open_fsp_cluster(
|
mngr=open_fsp_cluster(
|
||||||
workers,
|
workers,
|
||||||
|
|
|
@ -54,6 +54,7 @@ async def update_pnl_from_feed(
|
||||||
|
|
||||||
feed: Feed,
|
feed: Feed,
|
||||||
order_mode: OrderMode, # noqa
|
order_mode: OrderMode, # noqa
|
||||||
|
tracker: PositionTracker,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''Real-time display the current pp's PnL in the appropriate label.
|
'''Real-time display the current pp's PnL in the appropriate label.
|
||||||
|
@ -76,7 +77,8 @@ async def update_pnl_from_feed(
|
||||||
types = ('bid', 'last', 'last', 'utrade')
|
types = ('bid', 'last', 'last', 'utrade')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('No pp?!?!')
|
log.info(f'No position (yet) for {tracker.alloc.account}@{key}')
|
||||||
|
return
|
||||||
|
|
||||||
# real-time update pnl on the status pane
|
# real-time update pnl on the status pane
|
||||||
try:
|
try:
|
||||||
|
@ -343,6 +345,7 @@ class SettingsPane:
|
||||||
update_pnl_from_feed,
|
update_pnl_from_feed,
|
||||||
feed,
|
feed,
|
||||||
mode,
|
mode,
|
||||||
|
tracker,
|
||||||
)
|
)
|
||||||
|
|
||||||
# immediately display in status label
|
# immediately display in status label
|
||||||
|
|
|
@ -47,7 +47,7 @@ from ._position import (
|
||||||
)
|
)
|
||||||
from ._label import FormatLabel
|
from ._label import FormatLabel
|
||||||
from ._window import MultiStatus
|
from ._window import MultiStatus
|
||||||
from ..clearing._messages import Order
|
from ..clearing._messages import Order, BrokerdPosition
|
||||||
from ._forms import open_form_input_handling
|
from ._forms import open_form_input_handling
|
||||||
|
|
||||||
|
|
||||||
|
@ -529,7 +529,12 @@ async def open_order_mode(
|
||||||
|
|
||||||
book: OrderBook
|
book: OrderBook
|
||||||
trades_stream: tractor.MsgStream
|
trades_stream: tractor.MsgStream
|
||||||
position_msgs: dict
|
|
||||||
|
# The keys in this dict **must** be in set our set of "normalized"
|
||||||
|
# symbol names (i.e. the same names you'd get back in search
|
||||||
|
# results) in order for position msgs to correctly trigger the
|
||||||
|
# display of a position indicator on screen.
|
||||||
|
position_msgs: dict[str, list[BrokerdPosition]]
|
||||||
|
|
||||||
# spawn EMS actor-service
|
# spawn EMS actor-service
|
||||||
async with (
|
async with (
|
||||||
|
@ -563,7 +568,9 @@ async def open_order_mode(
|
||||||
providers=symbol.brokers
|
providers=symbol.brokers
|
||||||
)
|
)
|
||||||
|
|
||||||
# use only loaded accounts according to brokerd
|
# XXX: ``brokerd`` delivers a set of account names that it allows
|
||||||
|
# use of but the user also can define the accounts they'd like
|
||||||
|
# to use, in order, in their `brokers.toml` file.
|
||||||
accounts = {}
|
accounts = {}
|
||||||
for name in brokerd_accounts:
|
for name in brokerd_accounts:
|
||||||
# ensure name is in ``brokers.toml``
|
# ensure name is in ``brokers.toml``
|
||||||
|
@ -620,8 +627,8 @@ async def open_order_mode(
|
||||||
# alloc?
|
# alloc?
|
||||||
pp_tracker.update_from_pp()
|
pp_tracker.update_from_pp()
|
||||||
|
|
||||||
|
# on existing position, show pp tracking graphics
|
||||||
if pp_tracker.startup_pp.size != 0:
|
if pp_tracker.startup_pp.size != 0:
|
||||||
# if no position, don't show pp tracking graphics
|
|
||||||
pp_tracker.show()
|
pp_tracker.show()
|
||||||
pp_tracker.hide_info()
|
pp_tracker.hide_info()
|
||||||
|
|
||||||
|
@ -805,12 +812,13 @@ async def process_trades_and_update_ui(
|
||||||
|
|
||||||
tracker = mode.trackers[msg['account']]
|
tracker = mode.trackers[msg['account']]
|
||||||
tracker.live_pp.update_from_msg(msg)
|
tracker.live_pp.update_from_msg(msg)
|
||||||
tracker.update_from_pp()
|
|
||||||
|
|
||||||
# update order pane widgets
|
# update order pane widgets
|
||||||
|
tracker.update_from_pp()
|
||||||
mode.pane.update_status_ui(tracker)
|
mode.pane.update_status_ui(tracker)
|
||||||
# display pnl
|
|
||||||
mode.pane.display_pnl(tracker)
|
if tracker.live_pp.size:
|
||||||
|
# display pnl
|
||||||
|
mode.pane.display_pnl(tracker)
|
||||||
|
|
||||||
# short circuit to next msg to avoid
|
# short circuit to next msg to avoid
|
||||||
# unnecessary msg content lookups
|
# unnecessary msg content lookups
|
||||||
|
|
Loading…
Reference in New Issue