Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet f75057bc64 Fix TWS triggered trades msg packing 2021-07-05 09:53:46 -04:00
Tyler Goodlet d8fd1c0d64 Load provider search engines in tasks instead of exit stack 2021-07-05 09:53:19 -04:00
Tyler Goodlet b306d1573b Feed detach must explicitly unsub throttled streams
If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.

Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
2021-07-05 09:41:35 -04:00
Tyler Goodlet 77baad1e92 Make json resp log debug level 2021-07-05 09:36:54 -04:00
Tyler Goodlet ce40e46c91 TOSQUASH ems comments 2021-07-01 08:43:01 -04:00
Tyler Goodlet f348cbcd52 Better formalize `pikerd` service semantics
An async exit stack around the new `@tractor.context` is problematic
since a pushed context can't bubble errors unless the exit stack has
been closed. But in that case why do you need the exit stack if you're
going to push it and wait it right away; it seems more correct to use
a nursery and spawn a task in `pikerd` that waits on the both the
target context completion first (thus being able to bubble up any errors
from the remote, and top level service task) and the sub-actor portal.
(Sub)service Daemons are spawned with `.start_actor()` and thus will
block forever until cancelled so, add a way to cancel them explicitly
which we'll need eventually for restarts and dynamic feed management.

The big lesson here is that async exit stacks are not conducive to
spawning and monitoring service tasks, and especially so if
a `@tractor.context` is used since if the `.open_context()` call isn't
exited (only possible by the stack being closed), then there will be no
way for `trio` to cancel the task that pushed that context (since it
can't run a checkpoint while yielded inside the stack) without also
cancelling all other contexts pushed on that stack. Presuming one
`pikerd` task is used to do the original pushing (which it was) then
any error would have to kill all service daemon tasks which obviously
won't work.

I see this mostly as the painz of tinkering out an SC service manager
with `tractor` / `trio` for the first time, so try to go easy on the
process ;P
2021-06-26 16:52:15 -04:00
Tyler Goodlet 1edccf37d9 Support multiple client dialogues active on one brokerd trades dialogue 2021-06-26 16:00:04 -04:00
Tyler Goodlet d6d7c24320 WIP single brokerd dialogue 2021-06-25 00:57:58 -04:00
Tyler Goodlet 2465c8fc78 Pop subscriber streams on connection errors 2021-06-25 00:53:32 -04:00
Tyler Goodlet 998775dfd9 Don't use a context stack for contexts 2021-06-25 00:44:02 -04:00
7 changed files with 471 additions and 278 deletions

View File

@ -19,11 +19,12 @@ Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager, AsyncExitStack
from contextlib import asynccontextmanager
from collections import defaultdict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log
@ -45,36 +46,79 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
ctx_stack: AsyncExitStack
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def open_remote_ctx(
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> tractor.Context:
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` tearodwn.
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context(
target,
**kwargs,
)
)
return ctx
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
# wait on any context's return value
ctx_res = await ctx.result()
log.info(
f'`pikerd` service {name} started with value {ctx_res}'
)
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until cancelled
# either by error from the target context function or
# by being cancelled here by the surroundingn cancel
# scope
return await (portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None
@ -117,22 +161,22 @@ async def open_pikerd(
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# setup service mngr singleton instance
async with AsyncExitStack() as stack:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
ctx_stack=stack,
)
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services
yield _services
@asynccontextmanager
@ -174,16 +218,20 @@ async def maybe_open_pikerd(
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
@ -209,7 +257,7 @@ class Brokerd:
async def maybe_spawn_daemon(
service_name: str,
spawn_func: Callable,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,
@ -219,6 +267,13 @@ async def maybe_spawn_daemon(
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
"""
if loglevel:
get_console_log(loglevel)
@ -246,13 +301,24 @@ async def maybe_spawn_daemon(
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_func(**spawn_args)
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
spawn_func,
service_task_target,
**spawn_args,
)
@ -267,7 +333,7 @@ async def spawn_brokerd(
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> tractor._portal.Portal:
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
@ -280,6 +346,8 @@ async def spawn_brokerd(
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
portal = await _services.actor_n.start_actor(
dname,
enable_modules=_data_mods + [brokermod.__name__],
@ -291,13 +359,13 @@ async def spawn_brokerd(
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.open_remote_ctx(
await _services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return dname
return True
@asynccontextmanager
@ -314,7 +382,7 @@ async def maybe_spawn_brokerd(
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
@ -328,7 +396,7 @@ async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> tractor._portal.Portal:
) -> bool:
"""
Start the clearing engine under ``pikerd``.
@ -352,12 +420,12 @@ async def spawn_emsd(
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.open_remote_ctx(
await _services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
)
return 'emsd'
return True
@asynccontextmanager
@ -372,7 +440,7 @@ async def maybe_open_emsd(
async with maybe_spawn_daemon(
'emsd',
spawn_func=spawn_emsd,
service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,

View File

@ -53,6 +53,6 @@ def resproc(
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
else:
log.trace(f"Received json contents:\n{colorize_json(json)}")
log.debug(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp

View File

@ -1442,14 +1442,14 @@ async def trades_dialogue(
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
msg.reqid = 'tws-' + str(-1 * msg.reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg.external = True
msg.broker_details['external_src'] = 'tws'
continue
# XXX: we always serialize to a dict for msgpack

View File

@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -82,15 +84,16 @@ def mk_check(
@dataclass
class _DarkBook:
"""Client-side execution book.
'''EMS-trigger execution book.
Contains conditions for executions (aka "orders") which are not
exposed to brokers and thus the market; i.e. these are privacy
focussed "client side" orders.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
A singleton instance is created per EMS actor (for now).
A an instance per `brokerd` is created per EMS actor (for now).
"""
'''
broker: str
# levels which have an executable action (eg. alert, order, signal)
@ -256,17 +259,34 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel):
'''Order router which manages per-broker dark books, alerts,
and clearing related data feed management.
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
'''
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
@ -280,10 +300,160 @@ class _Router(BaseModel):
return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug about this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed/fucked
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _setup_persistent_emsd(
@ -298,20 +468,18 @@ async def _setup_persistent_emsd(
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent
# TODO: send back the full set of persistent
# orders/execs?
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
# allow service tasks to run until cancelled
await trio.sleep_forever()
async def translate_and_relay_brokerd_events(
broker: str,
# ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router,
) -> AsyncIterator[dict]:
@ -334,6 +502,11 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
@ -342,13 +515,15 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by
# broadcasting updates on all client streams
for oid, ems_client_order_stream in router.dialogues.items():
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
for client_stream in router.clients:
await client_stream.send(pos_msg)
continue
@ -425,7 +600,7 @@ async def translate_and_relay_brokerd_events(
resp = None
broker_details = {}
client_flow_complete: bool = False
# client_flow_complete: bool = False
if name in (
'error',
@ -460,7 +635,7 @@ async def translate_and_relay_brokerd_events(
if msg.status == 'cancelled':
client_flow_complete = True
# client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
@ -473,7 +648,7 @@ async def translate_and_relay_brokerd_events(
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
client_flow_complete = True
# client_flow_complete = True
log.info(f'Execution for {oid} is complete!')
# just log it
@ -503,22 +678,26 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message
# to requesting EMS client
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# if client_flow_complete:
# router.dialogues.pop(oid)
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# if client_flow_complete:
# router.dialogues.pop(oid)
async def process_client_order_cmds(
@ -533,6 +712,8 @@ async def process_client_order_cmds(
) -> None:
client_dialogues = router.dialogues
# cmd: dict
async for cmd in client_order_stream:
@ -541,14 +722,16 @@ async def process_client_order_cmds(
action = cmd['action']
oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
@ -655,7 +838,7 @@ async def process_client_order_cmds(
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that order with them immediately.
# that live order asap.
dark_book._ems_entries[oid] = msg
# "DARK" triggers
@ -725,102 +908,6 @@ async def process_client_order_cmds(
)
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
broker: str,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if broker in _router.relays:
positions, brokerd_trades_stream = _router.relays[broker]
# TODO: get updated positions here?
yield positions, brokerd_trades_stream
return
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
trio.open_nursery() as n,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
# ems_client_order_stream,
brokerd_trades_stream,
dark_book,
_router,
)
_router.relays[broker] = (positions, brokerd_trades_stream)
try:
yield positions, brokerd_trades_stream
finally:
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _emsd_main(
@ -855,7 +942,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
|
- ``translate_and_relay_brokerd_events()``:
- (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
@ -905,24 +992,24 @@ async def _emsd_main(
# only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
maybe_open_brokerd_trades_dialogue(
_router,
_router.maybe_open_brokerd_trades_dialogue(
feed,
broker,
symbol,
dark_book,
_exec_mode,
loglevel,
) as (positions, brokerd_trades_stream),
) as relay,
trio.open_nursery() as n,
):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(positions)
await ems_ctx.started(relay.positions)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
@ -932,7 +1019,8 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream,
feed.stream,
@ -942,12 +1030,41 @@ async def _emsd_main(
)
# start inbound (from attached client) order request processing
await process_client_order_cmds(
try:
_router.clients.add(ems_client_order_stream)
ems_client_order_stream,
brokerd_trades_stream,
symbol,
feed,
dark_book,
_router,
)
await process_client_order_cmds(
ems_client_order_stream,
# relay.brokerd_dialogue,
brokerd_stream,
symbol,
feed,
dark_book,
_router,
)
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
dialogues = _router.dialogues
for oid, client_stream in dialogues.items():
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).

View File

@ -233,27 +233,34 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs:
if tick_throttle:
await stream.send(quote)
try:
if tick_throttle:
await stream.send(quote)
else:
try:
else:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:
sleep_period = 1/rate - 0.000616
@ -289,8 +296,14 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
await stream.send({first_quote['symbol']: first_quote})
break
try:
await stream.send({first_quote['symbol']: first_quote})
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time()
diff = end - start

View File

@ -305,6 +305,11 @@ async def attach_feed_bus(
):
if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
@ -321,7 +326,12 @@ async def attach_feed_bus(
try:
await trio.sleep_forever()
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
@ -473,11 +483,6 @@ async def open_feed(
ctx.open_stream() as stream,
):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
@ -520,4 +525,8 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
yield feed
try:
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()

View File

@ -19,7 +19,6 @@ High level Qt chart widgets.
"""
import time
from contextlib import AsyncExitStack
from typing import Tuple, Dict, Any, Optional
from types import ModuleType
from functools import partial
@ -844,7 +843,7 @@ class ChartPlotWidget(pg.PlotWidget):
# istart=max(lbar, l), iend=min(rbar, r), just_history=True)
# bars_len = rbar - lbar
# log.trace(
# log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
@ -1474,7 +1473,6 @@ async def display_symbol_data(
) as feed,
trio.open_nursery() as n,
):
ohlcv: ShmArray = feed.shm
@ -1542,77 +1540,66 @@ async def display_symbol_data(
},
})
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
linkedsplits,
fsp_conf,
sym,
ohlcv,
brokermod,
loading_sym_key,
loglevel,
)
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
linkedsplits,
fsp_conf,
sym,
ohlcv,
brokermod,
loading_sym_key,
loglevel,
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
# TODO: instead we should start based on instrument trading hours?
# wait for a first quote before we start any update tasks
# quote = await feed.receive()
# log.info(f'Received first quote {quote}')
# TODO: instead we should start based on instrument trading hours?
# wait for a first quote before we start any update tasks
# quote = await feed.receive()
# log.info(f'Received first quote {quote}')
n.start_soon(
check_for_new_bars,
feed,
ohlcv,
linkedsplits
)
n.start_soon(
check_for_new_bars,
feed,
ohlcv,
linkedsplits
)
await start_order_mode(chart, symbol, provider, order_mode_started)
await start_order_mode(chart, symbol, provider, order_mode_started)
async def load_providers(
async def load_provider_search(
brokernames: list[str],
broker: str,
loglevel: str,
) -> None:
# TODO: seems like our incentive for brokerd caching lelel
backends = {}
log.info(f'loading brokerd for {broker}..')
async with AsyncExitStack() as stack:
# TODO: spawn these async in nursery.
# load all requested brokerd's at startup and load their
# search engines.
for broker in brokernames:
async with (
log.info(f'loading brokerd for {broker}..')
# spin up broker daemons for each provider
portal = await stack.enter_async_context(
maybe_spawn_brokerd(
broker,
loglevel=loglevel
)
)
maybe_spawn_brokerd(
broker,
loglevel=loglevel
) as portal,
backends[broker] = portal
feed.install_brokerd_search(
portal,
get_brokermod(broker),
),
):
await stack.enter_async_context(
feed.install_brokerd_search(
portal,
get_brokermod(broker),
)
)
# keep search engines up until cancelled
# keep search engine stream up until cancelled
await trio.sleep_forever()
@ -1653,9 +1640,7 @@ async def _async_main(
sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz')
async with (
trio.open_nursery() as root_n,
):
async with trio.open_nursery() as root_n:
# set root nursery and task stack for spawning other charts/feeds
# that run cached in the bg
@ -1694,7 +1679,8 @@ async def _async_main(
):
# load other providers into search **after**
# the chart's select cache
root_n.start_soon(load_providers, brokernames, loglevel)
for broker in brokernames:
root_n.start_soon(load_provider_search, broker, loglevel)
await order_mode_ready.wait()