Compare commits

..

No commits in common. "f75057bc64f06b617687872122f6c5b9b2ab770a" and "b81c538e85c407746b7c60e8b2ec47b30738cb43" have entirely different histories.

7 changed files with 278 additions and 471 deletions

View File

@ -19,12 +19,11 @@ Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, AsyncExitStack
from collections import defaultdict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log
@ -46,79 +45,36 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
ctx_stack: AsyncExitStack
class Config:
arbitrary_types_allowed = True
async def start_service_task(
async def open_remote_ctx(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> (trio.CancelScope, tractor.Context):
) -> tractor.Context:
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
that gets unwound at ``pikerd`` tearodwn.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
# wait on any context's return value
ctx_res = await ctx.result()
log.info(
f'`pikerd` service {name} started with value {ctx_res}'
)
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or
# by being cancelled here by the surroundingn cancel
# scope
return await (portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
)
return ctx
_services: Optional[Services] = None
@ -161,19 +117,19 @@ async def open_pikerd(
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# setup service mngr singleton instance
async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
ctx_stack=stack,
)
yield _services
@ -218,20 +174,16 @@ async def maybe_open_pikerd(
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
# presume pikerd role
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
@ -257,7 +209,7 @@ class Brokerd:
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_func: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,
@ -267,13 +219,6 @@ async def maybe_spawn_daemon(
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
"""
if loglevel:
get_console_log(loglevel)
@ -301,24 +246,13 @@ async def maybe_spawn_daemon(
) as pikerd_portal:
if pikerd_portal is None:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_func(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
service_task_target,
spawn_func,
**spawn_args,
)
@ -333,7 +267,7 @@ async def spawn_brokerd(
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> bool:
) -> tractor._portal.Portal:
log.info(f'Spawning {brokername} broker daemon')
@ -346,8 +280,6 @@ async def spawn_brokerd(
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
portal = await _services.actor_n.start_actor(
dname,
enable_modules=_data_mods + [brokermod.__name__],
@ -359,13 +291,13 @@ async def spawn_brokerd(
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.start_service_task(
dname,
await _services.open_remote_ctx(
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return True
return dname
@asynccontextmanager
@ -382,7 +314,7 @@ async def maybe_spawn_brokerd(
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_func=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
@ -396,7 +328,7 @@ async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> bool:
) -> tractor._portal.Portal:
"""
Start the clearing engine under ``pikerd``.
@ -420,12 +352,12 @@ async def spawn_emsd(
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.start_service_task(
'emsd',
await _services.open_remote_ctx(
portal,
_setup_persistent_emsd,
)
return True
return 'emsd'
@asynccontextmanager
@ -440,7 +372,7 @@ async def maybe_open_emsd(
async with maybe_spawn_daemon(
'emsd',
service_task_target=spawn_emsd,
spawn_func=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,

View File

@ -53,6 +53,6 @@ def resproc(
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
else:
log.debug(f"Received json contents:\n{colorize_json(json)}")
log.trace(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp

View File

@ -1442,14 +1442,14 @@ async def trades_dialogue(
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg.broker_details['external_src'] = 'tws'
msg.external = True
continue
# XXX: we always serialize to a dict for msgpack

View File

@ -27,14 +27,12 @@ from typing import AsyncIterator, Callable, Any
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -84,16 +82,15 @@ def mk_check(
@dataclass
class _DarkBook:
'''EMS-trigger execution book.
"""Client-side execution book.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
Contains conditions for executions (aka "orders") which are not
exposed to brokers and thus the market; i.e. these are privacy
focussed "client side" orders.
A an instance per `brokerd` is created per EMS actor (for now).
A singleton instance is created per EMS actor (for now).
'''
"""
broker: str
# levels which have an executable action (eg. alert, order, signal)
@ -259,34 +256,17 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel):
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
'''Order router which manages per-broker dark books, alerts,
and clearing related data feed management.
'''
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
class Config:
arbitrary_types_allowed = True
@ -300,160 +280,10 @@ class _Router(BaseModel):
return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug about this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed/fucked
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _setup_persistent_emsd(
@ -468,18 +298,20 @@ async def _setup_persistent_emsd(
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
# TODO: send back the full set of persistent orders/execs persistent
await ctx.started()
# allow service tasks to run until cancelled
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
async def translate_and_relay_brokerd_events(
broker: str,
# ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router,
) -> AsyncIterator[dict]:
@ -502,11 +334,6 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
@ -515,15 +342,13 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients:
await client_stream.send(pos_msg)
for oid, ems_client_order_stream in router.dialogues.items():
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
continue
@ -600,7 +425,7 @@ async def translate_and_relay_brokerd_events(
resp = None
broker_details = {}
# client_flow_complete: bool = False
client_flow_complete: bool = False
if name in (
'error',
@ -635,7 +460,7 @@ async def translate_and_relay_brokerd_events(
if msg.status == 'cancelled':
# client_flow_complete = True
client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
@ -648,7 +473,7 @@ async def translate_and_relay_brokerd_events(
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
# client_flow_complete = True
client_flow_complete = True
log.info(f'Execution for {oid} is complete!')
# just log it
@ -678,7 +503,6 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
@ -689,9 +513,6 @@ async def translate_and_relay_brokerd_events(
brokerd_msg=broker_details,
).dict()
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
@ -712,8 +533,6 @@ async def process_client_order_cmds(
) -> None:
client_dialogues = router.dialogues
# cmd: dict
async for cmd in client_order_stream:
@ -722,16 +541,14 @@ async def process_client_order_cmds(
action = cmd['action']
oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
@ -838,7 +655,7 @@ async def process_client_order_cmds(
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
# that order with them immediately.
dark_book._ems_entries[oid] = msg
# "DARK" triggers
@ -908,6 +725,102 @@ async def process_client_order_cmds(
)
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
broker: str,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if broker in _router.relays:
positions, brokerd_trades_stream = _router.relays[broker]
# TODO: get updated positions here?
yield positions, brokerd_trades_stream
return
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
trio.open_nursery() as n,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
# ems_client_order_stream,
brokerd_trades_stream,
dark_book,
_router,
)
_router.relays[broker] = (positions, brokerd_trades_stream)
try:
yield positions, brokerd_trades_stream
finally:
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _emsd_main(
@ -942,7 +855,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
|
- (maybe) ``translate_and_relay_brokerd_events()``:
- ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
@ -992,24 +905,24 @@ async def _emsd_main(
# only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
_router.maybe_open_brokerd_trades_dialogue(
maybe_open_brokerd_trades_dialogue(
_router,
feed,
broker,
symbol,
dark_book,
_exec_mode,
loglevel,
) as relay,
) as (positions, brokerd_trades_stream),
trio.open_nursery() as n,
):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(relay.positions)
await ems_ctx.started(positions)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
@ -1019,8 +932,7 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
# relay.brokerd_dialogue,
brokerd_stream,
brokerd_trades_stream,
ems_client_order_stream,
feed.stream,
@ -1030,41 +942,12 @@ async def _emsd_main(
)
# start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
await process_client_order_cmds(
ems_client_order_stream,
# relay.brokerd_dialogue,
brokerd_stream,
brokerd_trades_stream,
symbol,
feed,
dark_book,
_router,
)
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
dialogues = _router.dialogues
for oid, client_stream in dialogues.items():
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).

View File

@ -233,13 +233,12 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs:
try:
if tick_throttle:
await stream.send(quote)
else:
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -248,19 +247,13 @@ async def sample_and_broadcast(
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))
log.error(f'{stream._ctx.chan.uid} dropped connection')
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:
sleep_period = 1/rate - 0.000616
@ -296,14 +289,8 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote})
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time()
diff = end - start

View File

@ -305,11 +305,6 @@ async def attach_feed_bus(
):
if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
@ -326,12 +321,7 @@ async def attach_feed_bus(
try:
await trio.sleep_forever()
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
@ -483,6 +473,11 @@ async def open_feed(
ctx.open_stream() as stream,
):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
@ -525,8 +520,4 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()

View File

@ -19,6 +19,7 @@ High level Qt chart widgets.
"""
import time
from contextlib import AsyncExitStack
from typing import Tuple, Dict, Any, Optional
from types import ModuleType
from functools import partial
@ -843,7 +844,7 @@ class ChartPlotWidget(pg.PlotWidget):
# istart=max(lbar, l), iend=min(rbar, r), just_history=True)
# bars_len = rbar - lbar
# log.debug(
# log.trace(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
@ -1473,6 +1474,7 @@ async def display_symbol_data(
) as feed,
trio.open_nursery() as n,
):
ohlcv: ShmArray = feed.shm
@ -1540,7 +1542,6 @@ async def display_symbol_data(
},
})
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
@ -1577,29 +1578,41 @@ async def display_symbol_data(
await start_order_mode(chart, symbol, provider, order_mode_started)
async def load_provider_search(
async def load_providers(
broker: str,
brokernames: list[str],
loglevel: str,
) -> None:
# TODO: seems like our incentive for brokerd caching lelel
backends = {}
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
log.info(f'loading brokerd for {broker}..')
async with (
# spin up broker daemons for each provider
portal = await stack.enter_async_context(
maybe_spawn_brokerd(
broker,
loglevel=loglevel
) as portal,
)
)
backends[broker] = portal
await stack.enter_async_context(
feed.install_brokerd_search(
portal,
get_brokermod(broker),
),
):
)
)
# keep search engine stream up until cancelled
# keep search engines up until cancelled
await trio.sleep_forever()
@ -1640,7 +1653,9 @@ async def _async_main(
sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz')
async with trio.open_nursery() as root_n:
async with (
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds
# that run cached in the bg
@ -1679,8 +1694,7 @@ async def _async_main(
):
# load other providers into search **after**
# the chart's select cache
for broker in brokernames:
root_n.start_soon(load_provider_search, broker, loglevel)
root_n.start_soon(load_providers, brokernames, loglevel)
await order_mode_ready.wait()