Compare commits
10 Commits
b81c538e85
...
f75057bc64
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | f75057bc64 | |
Tyler Goodlet | d8fd1c0d64 | |
Tyler Goodlet | b306d1573b | |
Tyler Goodlet | 77baad1e92 | |
Tyler Goodlet | ce40e46c91 | |
Tyler Goodlet | f348cbcd52 | |
Tyler Goodlet | 1edccf37d9 | |
Tyler Goodlet | d6d7c24320 | |
Tyler Goodlet | 2465c8fc78 | |
Tyler Goodlet | 998775dfd9 |
124
piker/_daemon.py
124
piker/_daemon.py
|
@ -19,11 +19,12 @@ Structured, daemon tree service management.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Optional, Union, Callable, Any
|
from typing import Optional, Union, Callable, Any
|
||||||
from contextlib import asynccontextmanager, AsyncExitStack
|
from contextlib import asynccontextmanager
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
import trio
|
import trio
|
||||||
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from .log import get_logger, get_console_log
|
from .log import get_logger, get_console_log
|
||||||
|
@ -45,36 +46,79 @@ _root_modules = [
|
||||||
|
|
||||||
|
|
||||||
class Services(BaseModel):
|
class Services(BaseModel):
|
||||||
|
|
||||||
actor_n: tractor._trionics.ActorNursery
|
actor_n: tractor._trionics.ActorNursery
|
||||||
service_n: trio.Nursery
|
service_n: trio.Nursery
|
||||||
debug_mode: bool # tractor sub-actor debug mode flag
|
debug_mode: bool # tractor sub-actor debug mode flag
|
||||||
ctx_stack: AsyncExitStack
|
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
arbitrary_types_allowed = True
|
arbitrary_types_allowed = True
|
||||||
|
|
||||||
async def open_remote_ctx(
|
async def start_service_task(
|
||||||
self,
|
self,
|
||||||
|
name: str,
|
||||||
portal: tractor.Portal,
|
portal: tractor.Portal,
|
||||||
target: Callable,
|
target: Callable,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tractor.Context:
|
) -> (trio.CancelScope, tractor.Context):
|
||||||
'''
|
'''
|
||||||
Open a context in a service sub-actor, add to a stack
|
Open a context in a service sub-actor, add to a stack
|
||||||
that gets unwound at ``pikerd`` tearodwn.
|
that gets unwound at ``pikerd`` teardown.
|
||||||
|
|
||||||
This allows for allocating long-running sub-services in our main
|
This allows for allocating long-running sub-services in our main
|
||||||
daemon and explicitly controlling their lifetimes.
|
daemon and explicitly controlling their lifetimes.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
ctx, first = await self.ctx_stack.enter_async_context(
|
async def open_context_in_task(
|
||||||
portal.open_context(
|
task_status: TaskStatus[
|
||||||
|
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> Any:
|
||||||
|
|
||||||
|
with trio.CancelScope() as cs:
|
||||||
|
|
||||||
|
async with portal.open_context(
|
||||||
target,
|
target,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
) as (ctx, first):
|
||||||
|
|
||||||
|
# unblock once the remote context has started
|
||||||
|
task_status.started((cs, first))
|
||||||
|
|
||||||
|
# wait on any context's return value
|
||||||
|
ctx_res = await ctx.result()
|
||||||
|
log.info(
|
||||||
|
f'`pikerd` service {name} started with value {ctx_res}'
|
||||||
)
|
)
|
||||||
)
|
|
||||||
return ctx
|
# wait on any error from the sub-actor
|
||||||
|
# NOTE: this will block indefinitely until cancelled
|
||||||
|
# either by error from the target context function or
|
||||||
|
# by being cancelled here by the surroundingn cancel
|
||||||
|
# scope
|
||||||
|
return await (portal.result(), ctx_res)
|
||||||
|
|
||||||
|
cs, first = await self.service_n.start(open_context_in_task)
|
||||||
|
|
||||||
|
# store the cancel scope and portal for later cancellation or
|
||||||
|
# retstart if needed.
|
||||||
|
self.service_tasks[name] = (cs, portal)
|
||||||
|
|
||||||
|
return cs, first
|
||||||
|
|
||||||
|
async def cancel_service(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
|
||||||
|
) -> Any:
|
||||||
|
|
||||||
|
log.info(f'Cancelling `pikerd` service {name}')
|
||||||
|
cs, portal = self.service_tasks[name]
|
||||||
|
cs.cancel()
|
||||||
|
return await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
_services: Optional[Services] = None
|
_services: Optional[Services] = None
|
||||||
|
@ -117,19 +161,19 @@ async def open_pikerd(
|
||||||
# spawn other specialized daemons I think?
|
# spawn other specialized daemons I think?
|
||||||
enable_modules=_root_modules,
|
enable_modules=_root_modules,
|
||||||
) as _,
|
) as _,
|
||||||
|
|
||||||
tractor.open_nursery() as actor_nursery,
|
tractor.open_nursery() as actor_nursery,
|
||||||
):
|
):
|
||||||
async with trio.open_nursery() as service_nursery:
|
async with trio.open_nursery() as service_nursery:
|
||||||
|
|
||||||
# setup service mngr singleton instance
|
# # setup service mngr singleton instance
|
||||||
async with AsyncExitStack() as stack:
|
# async with AsyncExitStack() as stack:
|
||||||
|
|
||||||
# assign globally for future daemon/task creation
|
# assign globally for future daemon/task creation
|
||||||
_services = Services(
|
_services = Services(
|
||||||
actor_n=actor_nursery,
|
actor_n=actor_nursery,
|
||||||
service_n=service_nursery,
|
service_n=service_nursery,
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
ctx_stack=stack,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
yield _services
|
yield _services
|
||||||
|
@ -174,16 +218,20 @@ async def maybe_open_pikerd(
|
||||||
|
|
||||||
# subtle, we must have the runtime up here or portal lookup will fail
|
# subtle, we must have the runtime up here or portal lookup will fail
|
||||||
async with maybe_open_runtime(loglevel, **kwargs):
|
async with maybe_open_runtime(loglevel, **kwargs):
|
||||||
|
|
||||||
async with tractor.find_actor(_root_dname) as portal:
|
async with tractor.find_actor(_root_dname) as portal:
|
||||||
# assert portal is not None
|
# assert portal is not None
|
||||||
if portal is not None:
|
if portal is not None:
|
||||||
yield portal
|
yield portal
|
||||||
return
|
return
|
||||||
|
|
||||||
# presume pikerd role
|
# presume pikerd role since no daemon could be found at
|
||||||
|
# configured address
|
||||||
async with open_pikerd(
|
async with open_pikerd(
|
||||||
|
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=kwargs.get('debug_mode', False),
|
debug_mode=kwargs.get('debug_mode', False),
|
||||||
|
|
||||||
) as _:
|
) as _:
|
||||||
# in the case where we're starting up the
|
# in the case where we're starting up the
|
||||||
# tractor-piker runtime stack in **this** process
|
# tractor-piker runtime stack in **this** process
|
||||||
|
@ -209,7 +257,7 @@ class Brokerd:
|
||||||
async def maybe_spawn_daemon(
|
async def maybe_spawn_daemon(
|
||||||
|
|
||||||
service_name: str,
|
service_name: str,
|
||||||
spawn_func: Callable,
|
service_task_target: Callable,
|
||||||
spawn_args: dict[str, Any],
|
spawn_args: dict[str, Any],
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -219,6 +267,13 @@ async def maybe_spawn_daemon(
|
||||||
If no ``service_name`` daemon-actor can be found,
|
If no ``service_name`` daemon-actor can be found,
|
||||||
spawn one in a local subactor and return a portal to it.
|
spawn one in a local subactor and return a portal to it.
|
||||||
|
|
||||||
|
If this function is called from a non-pikerd actor, the
|
||||||
|
spawned service will persist as long as pikerd does or
|
||||||
|
it is requested to be cancelled.
|
||||||
|
|
||||||
|
This can be seen as a service starting api for remote-actor
|
||||||
|
clients.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if loglevel:
|
if loglevel:
|
||||||
get_console_log(loglevel)
|
get_console_log(loglevel)
|
||||||
|
@ -246,13 +301,24 @@ async def maybe_spawn_daemon(
|
||||||
) as pikerd_portal:
|
) as pikerd_portal:
|
||||||
|
|
||||||
if pikerd_portal is None:
|
if pikerd_portal is None:
|
||||||
# we are root so spawn brokerd directly in our tree
|
# we are the root and thus are `pikerd`
|
||||||
# the root nursery is accessed through process global state
|
# so spawn the target service directly by calling
|
||||||
await spawn_func(**spawn_args)
|
# the provided target routine.
|
||||||
|
# XXX: this assumes that the target is well formed and will
|
||||||
|
# do the right things to setup both a sub-actor **and** call
|
||||||
|
# the ``_Services`` api from above to start the top level
|
||||||
|
# service task for that actor.
|
||||||
|
await service_task_target(**spawn_args)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# tell the remote `pikerd` to start the target,
|
||||||
|
# the target can't return a non-serializable value
|
||||||
|
# since it is expected that service startingn is
|
||||||
|
# non-blocking and the target task will persist running
|
||||||
|
# on `pikerd` after the client requesting it's start
|
||||||
|
# disconnects.
|
||||||
await pikerd_portal.run(
|
await pikerd_portal.run(
|
||||||
spawn_func,
|
service_task_target,
|
||||||
**spawn_args,
|
**spawn_args,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -267,7 +333,7 @@ async def spawn_brokerd(
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
|
|
||||||
) -> tractor._portal.Portal:
|
) -> bool:
|
||||||
|
|
||||||
log.info(f'Spawning {brokername} broker daemon')
|
log.info(f'Spawning {brokername} broker daemon')
|
||||||
|
|
||||||
|
@ -280,6 +346,8 @@ async def spawn_brokerd(
|
||||||
global _services
|
global _services
|
||||||
assert _services
|
assert _services
|
||||||
|
|
||||||
|
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||||
|
# actor nursery
|
||||||
portal = await _services.actor_n.start_actor(
|
portal = await _services.actor_n.start_actor(
|
||||||
dname,
|
dname,
|
||||||
enable_modules=_data_mods + [brokermod.__name__],
|
enable_modules=_data_mods + [brokermod.__name__],
|
||||||
|
@ -291,13 +359,13 @@ async def spawn_brokerd(
|
||||||
# non-blocking setup of brokerd service nursery
|
# non-blocking setup of brokerd service nursery
|
||||||
from .data import _setup_persistent_brokerd
|
from .data import _setup_persistent_brokerd
|
||||||
|
|
||||||
await _services.open_remote_ctx(
|
await _services.start_service_task(
|
||||||
|
dname,
|
||||||
portal,
|
portal,
|
||||||
_setup_persistent_brokerd,
|
_setup_persistent_brokerd,
|
||||||
brokername=brokername,
|
brokername=brokername,
|
||||||
)
|
)
|
||||||
|
return True
|
||||||
return dname
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
@ -314,7 +382,7 @@ async def maybe_spawn_brokerd(
|
||||||
async with maybe_spawn_daemon(
|
async with maybe_spawn_daemon(
|
||||||
|
|
||||||
f'brokerd.{brokername}',
|
f'brokerd.{brokername}',
|
||||||
spawn_func=spawn_brokerd,
|
service_task_target=spawn_brokerd,
|
||||||
spawn_args={'brokername': brokername, 'loglevel': loglevel},
|
spawn_args={'brokername': brokername, 'loglevel': loglevel},
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -328,7 +396,7 @@ async def spawn_emsd(
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
|
|
||||||
) -> tractor._portal.Portal:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
Start the clearing engine under ``pikerd``.
|
Start the clearing engine under ``pikerd``.
|
||||||
|
|
||||||
|
@ -352,12 +420,12 @@ async def spawn_emsd(
|
||||||
# non-blocking setup of clearing service
|
# non-blocking setup of clearing service
|
||||||
from .clearing._ems import _setup_persistent_emsd
|
from .clearing._ems import _setup_persistent_emsd
|
||||||
|
|
||||||
await _services.open_remote_ctx(
|
await _services.start_service_task(
|
||||||
|
'emsd',
|
||||||
portal,
|
portal,
|
||||||
_setup_persistent_emsd,
|
_setup_persistent_emsd,
|
||||||
)
|
)
|
||||||
|
return True
|
||||||
return 'emsd'
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
@ -372,7 +440,7 @@ async def maybe_open_emsd(
|
||||||
async with maybe_spawn_daemon(
|
async with maybe_spawn_daemon(
|
||||||
|
|
||||||
'emsd',
|
'emsd',
|
||||||
spawn_func=spawn_emsd,
|
service_task_target=spawn_emsd,
|
||||||
spawn_args={'loglevel': loglevel},
|
spawn_args={'loglevel': loglevel},
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
|
@ -53,6 +53,6 @@ def resproc(
|
||||||
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
||||||
raise BrokerError(resp.text)
|
raise BrokerError(resp.text)
|
||||||
else:
|
else:
|
||||||
log.trace(f"Received json contents:\n{colorize_json(json)}")
|
log.debug(f"Received json contents:\n{colorize_json(json)}")
|
||||||
|
|
||||||
return json if return_json else resp
|
return json if return_json else resp
|
||||||
|
|
|
@ -1442,14 +1442,14 @@ async def trades_dialogue(
|
||||||
if getattr(msg, 'reqid', 0) < -1:
|
if getattr(msg, 'reqid', 0) < -1:
|
||||||
|
|
||||||
# it's a trade event generated by TWS usage.
|
# it's a trade event generated by TWS usage.
|
||||||
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
|
log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
|
||||||
|
|
||||||
msg.reqid = 'tws-' + str(-1 * msg.reqid)
|
msg.reqid = 'tws-' + str(-1 * msg.reqid)
|
||||||
|
|
||||||
# mark msg as from "external system"
|
# mark msg as from "external system"
|
||||||
# TODO: probably something better then this.. and start
|
# TODO: probably something better then this.. and start
|
||||||
# considering multiplayer/group trades tracking
|
# considering multiplayer/group trades tracking
|
||||||
msg.external = True
|
msg.broker_details['external_src'] = 'tws'
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# XXX: we always serialize to a dict for msgpack
|
# XXX: we always serialize to a dict for msgpack
|
||||||
|
|
|
@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
import trio
|
import trio
|
||||||
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ..data._normalize import iterticks
|
from ..data._normalize import iterticks
|
||||||
from ..data.feed import Feed
|
from ..data.feed import Feed
|
||||||
|
from .._daemon import maybe_spawn_brokerd
|
||||||
from . import _paper_engine as paper
|
from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
Status, Order,
|
Status, Order,
|
||||||
|
@ -82,15 +84,16 @@ def mk_check(
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class _DarkBook:
|
class _DarkBook:
|
||||||
"""Client-side execution book.
|
'''EMS-trigger execution book.
|
||||||
|
|
||||||
Contains conditions for executions (aka "orders") which are not
|
Contains conditions for executions (aka "orders" or "triggers")
|
||||||
exposed to brokers and thus the market; i.e. these are privacy
|
which are not exposed to brokers and thus the market; i.e. these are
|
||||||
focussed "client side" orders.
|
privacy focussed "client side" orders which are submitted in real-time
|
||||||
|
based on specified trigger conditions.
|
||||||
|
|
||||||
A singleton instance is created per EMS actor (for now).
|
A an instance per `brokerd` is created per EMS actor (for now).
|
||||||
|
|
||||||
"""
|
'''
|
||||||
broker: str
|
broker: str
|
||||||
|
|
||||||
# levels which have an executable action (eg. alert, order, signal)
|
# levels which have an executable action (eg. alert, order, signal)
|
||||||
|
@ -256,17 +259,34 @@ async def clear_dark_triggers(
|
||||||
# print(f'execs scan took: {time.time() - start}')
|
# print(f'execs scan took: {time.time() - start}')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TradesRelay:
|
||||||
|
brokerd_dialogue: tractor.MsgStream
|
||||||
|
positions: dict[str, float]
|
||||||
|
consumers: int = 0
|
||||||
|
|
||||||
|
|
||||||
class _Router(BaseModel):
|
class _Router(BaseModel):
|
||||||
'''Order router which manages per-broker dark books, alerts,
|
'''Order router which manages and tracks per-broker dark book,
|
||||||
and clearing related data feed management.
|
alerts, clearing and related data feed management.
|
||||||
|
|
||||||
|
A singleton per ``emsd`` actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# setup at actor spawn time
|
||||||
nursery: trio.Nursery
|
nursery: trio.Nursery
|
||||||
|
|
||||||
feeds: dict[tuple[str, str], Any] = {}
|
feeds: dict[tuple[str, str], Any] = {}
|
||||||
|
|
||||||
|
# broker to book map
|
||||||
books: dict[str, _DarkBook] = {}
|
books: dict[str, _DarkBook] = {}
|
||||||
|
|
||||||
|
# order id to client stream map
|
||||||
|
clients: set[tractor.MsgStream] = set()
|
||||||
dialogues: dict[str, list[tractor.MsgStream]] = {}
|
dialogues: dict[str, list[tractor.MsgStream]] = {}
|
||||||
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
|
|
||||||
|
# brokername to trades-dialogues streams with ``brokerd`` actors
|
||||||
|
relays: dict[str, TradesRelay] = {}
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
arbitrary_types_allowed = True
|
arbitrary_types_allowed = True
|
||||||
|
@ -280,10 +300,160 @@ class _Router(BaseModel):
|
||||||
|
|
||||||
return self.books.setdefault(brokername, _DarkBook(brokername))
|
return self.books.setdefault(brokername, _DarkBook(brokername))
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def maybe_open_brokerd_trades_dialogue(
|
||||||
|
|
||||||
|
self,
|
||||||
|
feed: Feed,
|
||||||
|
symbol: str,
|
||||||
|
dark_book: _DarkBook,
|
||||||
|
_exec_mode: str,
|
||||||
|
loglevel: str,
|
||||||
|
|
||||||
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
|
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||||
|
already exists.
|
||||||
|
|
||||||
|
'''
|
||||||
|
relay = self.relays.get(feed.mod.name)
|
||||||
|
|
||||||
|
if relay is None:
|
||||||
|
|
||||||
|
relay = await self.nursery.start(
|
||||||
|
open_brokerd_trades_dialogue,
|
||||||
|
self,
|
||||||
|
feed,
|
||||||
|
symbol,
|
||||||
|
_exec_mode,
|
||||||
|
loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
relay.consumers += 1
|
||||||
|
|
||||||
|
# TODO: get updated positions here?
|
||||||
|
assert relay.brokerd_dialogue
|
||||||
|
try:
|
||||||
|
yield relay
|
||||||
|
|
||||||
|
finally:
|
||||||
|
|
||||||
|
# TODO: what exactly needs to be torn down here or
|
||||||
|
# are we just consumer tracking?
|
||||||
|
|
||||||
|
relay.consumers -= 1
|
||||||
|
|
||||||
|
|
||||||
_router: _Router = None
|
_router: _Router = None
|
||||||
|
|
||||||
|
|
||||||
|
async def open_brokerd_trades_dialogue(
|
||||||
|
|
||||||
|
router: _Router,
|
||||||
|
feed: Feed,
|
||||||
|
symbol: str,
|
||||||
|
_exec_mode: str,
|
||||||
|
loglevel: str,
|
||||||
|
|
||||||
|
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
|
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||||
|
already exists.
|
||||||
|
|
||||||
|
'''
|
||||||
|
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
|
||||||
|
|
||||||
|
broker = feed.mod.name
|
||||||
|
|
||||||
|
# TODO: make a `tractor` bug about this!
|
||||||
|
# portal = feed._brokerd_portal
|
||||||
|
|
||||||
|
# XXX: we must have our own portal + channel otherwise
|
||||||
|
# when the data feed closes it may result in a half-closed/fucked
|
||||||
|
# channel that the brokerd side thinks is still open somehow!?
|
||||||
|
async with maybe_spawn_brokerd(
|
||||||
|
|
||||||
|
broker,
|
||||||
|
loglevel=loglevel,
|
||||||
|
|
||||||
|
) as portal:
|
||||||
|
|
||||||
|
if trades_endpoint is None or _exec_mode == 'paper':
|
||||||
|
|
||||||
|
# for paper mode we need to mock this trades response feed
|
||||||
|
# so we load bidir stream to a new sub-actor running a
|
||||||
|
# paper-simulator clearing engine.
|
||||||
|
|
||||||
|
# load the paper trading engine
|
||||||
|
_exec_mode = 'paper'
|
||||||
|
log.warning(f'Entering paper trading mode for {broker}')
|
||||||
|
|
||||||
|
# load the paper trading engine as a subactor of this emsd
|
||||||
|
# actor to simulate the real IPC load it'll have when also
|
||||||
|
# pulling data from feeds
|
||||||
|
open_trades_endpoint = paper.open_paperboi(
|
||||||
|
broker=broker,
|
||||||
|
symbol=symbol,
|
||||||
|
loglevel=loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# open live brokerd trades endpoint
|
||||||
|
open_trades_endpoint = portal.open_context(
|
||||||
|
trades_endpoint,
|
||||||
|
loglevel=loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
|
||||||
|
open_trades_endpoint as (brokerd_ctx, positions),
|
||||||
|
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||||
|
|
||||||
|
):
|
||||||
|
# XXX: really we only want one stream per `emsd` actor
|
||||||
|
# to relay global `brokerd` order events unless we're
|
||||||
|
# doing to expect each backend to relay only orders
|
||||||
|
# affiliated with a particular ``trades_dialogue()``
|
||||||
|
# session (seems annoying for implementers). So, here
|
||||||
|
# we cache the relay task and instead of running multiple
|
||||||
|
# tasks (which will result in multiples of the same msg being
|
||||||
|
# relayed for each EMS client) we just register each client
|
||||||
|
# stream to this single relay loop using _router.dialogues
|
||||||
|
|
||||||
|
# begin processing order events from the target brokerd backend
|
||||||
|
# by receiving order submission response messages,
|
||||||
|
# normalizing them to EMS messages and relaying back to
|
||||||
|
# the piker order client set.
|
||||||
|
|
||||||
|
relay = TradesRelay(
|
||||||
|
brokerd_dialogue=brokerd_trades_stream,
|
||||||
|
positions=positions,
|
||||||
|
consumers=1
|
||||||
|
)
|
||||||
|
|
||||||
|
_router.relays[broker] = relay
|
||||||
|
|
||||||
|
# the ems scan loop may be cancelled by the client but we
|
||||||
|
# want to keep the ``brokerd`` dialogue up regardless
|
||||||
|
|
||||||
|
task_status.started(relay)
|
||||||
|
|
||||||
|
await translate_and_relay_brokerd_events(
|
||||||
|
broker,
|
||||||
|
brokerd_trades_stream,
|
||||||
|
_router,
|
||||||
|
)
|
||||||
|
|
||||||
|
# this context should block here indefinitely until
|
||||||
|
# the ``brokerd`` task either dies or is cancelled
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# parent context must have been closed
|
||||||
|
# remove from cache so next client will respawn if needed
|
||||||
|
_router.relays.pop(broker)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def _setup_persistent_emsd(
|
async def _setup_persistent_emsd(
|
||||||
|
|
||||||
|
@ -298,20 +468,18 @@ async def _setup_persistent_emsd(
|
||||||
|
|
||||||
_router = _Router(nursery=service_nursery)
|
_router = _Router(nursery=service_nursery)
|
||||||
|
|
||||||
# TODO: send back the full set of persistent orders/execs persistent
|
# TODO: send back the full set of persistent
|
||||||
|
# orders/execs?
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
# we pin this task to keep the feeds manager active until the
|
# allow service tasks to run until cancelled
|
||||||
# parent actor decides to tear it down
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
async def translate_and_relay_brokerd_events(
|
async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
broker: str,
|
broker: str,
|
||||||
# ems_client_order_stream: tractor.MsgStream,
|
|
||||||
brokerd_trades_stream: tractor.MsgStream,
|
brokerd_trades_stream: tractor.MsgStream,
|
||||||
book: _DarkBook,
|
|
||||||
router: _Router,
|
router: _Router,
|
||||||
|
|
||||||
) -> AsyncIterator[dict]:
|
) -> AsyncIterator[dict]:
|
||||||
|
@ -334,6 +502,11 @@ async def translate_and_relay_brokerd_events(
|
||||||
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
book = router.get_dark_book(broker)
|
||||||
|
relay = router.relays[broker]
|
||||||
|
|
||||||
|
assert relay.brokerd_dialogue == brokerd_trades_stream
|
||||||
|
|
||||||
async for brokerd_msg in brokerd_trades_stream:
|
async for brokerd_msg in brokerd_trades_stream:
|
||||||
|
|
||||||
name = brokerd_msg['name']
|
name = brokerd_msg['name']
|
||||||
|
@ -342,13 +515,15 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
if name == 'position':
|
if name == 'position':
|
||||||
|
|
||||||
|
pos_msg = BrokerdPosition(**brokerd_msg).dict()
|
||||||
|
|
||||||
|
# keep up to date locally in ``emsd``
|
||||||
|
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
|
||||||
|
|
||||||
# relay through position msgs immediately by
|
# relay through position msgs immediately by
|
||||||
# broadcasting updates on all client streams
|
# broadcasting updates on all client streams
|
||||||
for oid, ems_client_order_stream in router.dialogues.items():
|
for client_stream in router.clients:
|
||||||
|
await client_stream.send(pos_msg)
|
||||||
await ems_client_order_stream.send(
|
|
||||||
BrokerdPosition(**brokerd_msg).dict()
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -425,7 +600,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
resp = None
|
resp = None
|
||||||
broker_details = {}
|
broker_details = {}
|
||||||
client_flow_complete: bool = False
|
# client_flow_complete: bool = False
|
||||||
|
|
||||||
if name in (
|
if name in (
|
||||||
'error',
|
'error',
|
||||||
|
@ -460,7 +635,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
if msg.status == 'cancelled':
|
if msg.status == 'cancelled':
|
||||||
|
|
||||||
client_flow_complete = True
|
# client_flow_complete = True
|
||||||
log.info(f'Cancellation for {oid} is complete!')
|
log.info(f'Cancellation for {oid} is complete!')
|
||||||
|
|
||||||
if msg.status == 'filled':
|
if msg.status == 'filled':
|
||||||
|
@ -473,7 +648,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# be sure to pop this stream from our dialogue set
|
# be sure to pop this stream from our dialogue set
|
||||||
# since the order dialogue should be done.
|
# since the order dialogue should be done.
|
||||||
client_flow_complete = True
|
# client_flow_complete = True
|
||||||
log.info(f'Execution for {oid} is complete!')
|
log.info(f'Execution for {oid} is complete!')
|
||||||
|
|
||||||
# just log it
|
# just log it
|
||||||
|
@ -503,6 +678,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# Create and relay response status message
|
# Create and relay response status message
|
||||||
# to requesting EMS client
|
# to requesting EMS client
|
||||||
|
try:
|
||||||
ems_client_order_stream = router.dialogues[oid]
|
ems_client_order_stream = router.dialogues[oid]
|
||||||
await ems_client_order_stream.send(
|
await ems_client_order_stream.send(
|
||||||
Status(
|
Status(
|
||||||
|
@ -513,6 +689,9 @@ async def translate_and_relay_brokerd_events(
|
||||||
brokerd_msg=broker_details,
|
brokerd_msg=broker_details,
|
||||||
).dict()
|
).dict()
|
||||||
)
|
)
|
||||||
|
except KeyError:
|
||||||
|
log.error(
|
||||||
|
f'Received `brokerd` msg for unknown client with oid: {oid}')
|
||||||
|
|
||||||
# TODO: do we want this to keep things cleaned up?
|
# TODO: do we want this to keep things cleaned up?
|
||||||
# it might require a special status from brokerd to affirm the
|
# it might require a special status from brokerd to affirm the
|
||||||
|
@ -533,6 +712,8 @@ async def process_client_order_cmds(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
client_dialogues = router.dialogues
|
||||||
|
|
||||||
# cmd: dict
|
# cmd: dict
|
||||||
async for cmd in client_order_stream:
|
async for cmd in client_order_stream:
|
||||||
|
|
||||||
|
@ -541,14 +722,16 @@ async def process_client_order_cmds(
|
||||||
action = cmd['action']
|
action = cmd['action']
|
||||||
oid = cmd['oid']
|
oid = cmd['oid']
|
||||||
|
|
||||||
|
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
|
||||||
|
# can be stored in sets like the old context was.
|
||||||
|
# wait, maybe this **is** already working thanks to our parent
|
||||||
|
# `trio` type?
|
||||||
|
|
||||||
# register this stream as an active dialogue for this order id
|
# register this stream as an active dialogue for this order id
|
||||||
# such that translated message from the brokerd backend can be
|
# such that translated message from the brokerd backend can be
|
||||||
# routed (relayed) to **just** that client stream (and in theory
|
# routed (relayed) to **just** that client stream (and in theory
|
||||||
# others who are registered for such order affiliated msgs).
|
# others who are registered for such order affiliated msgs).
|
||||||
|
client_dialogues[oid] = client_order_stream
|
||||||
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
|
|
||||||
# can be stored in sets like the old context was.
|
|
||||||
router.dialogues[oid] = client_order_stream
|
|
||||||
|
|
||||||
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
|
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
|
||||||
live_entry = dark_book._ems_entries.get(oid)
|
live_entry = dark_book._ems_entries.get(oid)
|
||||||
|
@ -655,7 +838,7 @@ async def process_client_order_cmds(
|
||||||
# flow so that if a cancel comes from the requesting
|
# flow so that if a cancel comes from the requesting
|
||||||
# client, before that ack, when the ack does arrive we
|
# client, before that ack, when the ack does arrive we
|
||||||
# immediately take the reqid from the broker and cancel
|
# immediately take the reqid from the broker and cancel
|
||||||
# that order with them immediately.
|
# that live order asap.
|
||||||
dark_book._ems_entries[oid] = msg
|
dark_book._ems_entries[oid] = msg
|
||||||
|
|
||||||
# "DARK" triggers
|
# "DARK" triggers
|
||||||
|
@ -725,102 +908,6 @@ async def process_client_order_cmds(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def maybe_open_brokerd_trades_dialogue(
|
|
||||||
|
|
||||||
router: _Router,
|
|
||||||
feed: Feed,
|
|
||||||
broker: str,
|
|
||||||
symbol: str,
|
|
||||||
dark_book: _DarkBook,
|
|
||||||
_exec_mode: str,
|
|
||||||
loglevel: str,
|
|
||||||
|
|
||||||
) -> tuple[dict, tractor.MsgStream]:
|
|
||||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
|
||||||
already exists.
|
|
||||||
|
|
||||||
'''
|
|
||||||
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
|
|
||||||
portal = feed._brokerd_portal
|
|
||||||
|
|
||||||
if broker in _router.relays:
|
|
||||||
|
|
||||||
positions, brokerd_trades_stream = _router.relays[broker]
|
|
||||||
|
|
||||||
# TODO: get updated positions here?
|
|
||||||
yield positions, brokerd_trades_stream
|
|
||||||
return
|
|
||||||
|
|
||||||
if trades_endpoint is None or _exec_mode == 'paper':
|
|
||||||
|
|
||||||
# for paper mode we need to mock this trades response feed
|
|
||||||
# so we load bidir stream to a new sub-actor running a
|
|
||||||
# paper-simulator clearing engine.
|
|
||||||
|
|
||||||
# load the paper trading engine
|
|
||||||
_exec_mode = 'paper'
|
|
||||||
log.warning(f'Entering paper trading mode for {broker}')
|
|
||||||
|
|
||||||
# load the paper trading engine as a subactor of this emsd
|
|
||||||
# actor to simulate the real IPC load it'll have when also
|
|
||||||
# pulling data from feeds
|
|
||||||
open_trades_endpoint = paper.open_paperboi(
|
|
||||||
broker=broker,
|
|
||||||
symbol=symbol,
|
|
||||||
loglevel=loglevel,
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# open live brokerd trades endpoint
|
|
||||||
open_trades_endpoint = portal.open_context(
|
|
||||||
trades_endpoint,
|
|
||||||
loglevel=loglevel,
|
|
||||||
)
|
|
||||||
|
|
||||||
async with (
|
|
||||||
|
|
||||||
open_trades_endpoint as (brokerd_ctx, positions),
|
|
||||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
|
||||||
trio.open_nursery() as n,
|
|
||||||
|
|
||||||
):
|
|
||||||
# XXX: really we only want one stream per `emsd` actor
|
|
||||||
# to relay global `brokerd` order events unless we're
|
|
||||||
# doing to expect each backend to relay only orders
|
|
||||||
# affiliated with a particular ``trades_dialogue()``
|
|
||||||
# session (seems annoying for implementers). So, here
|
|
||||||
# we cache the relay task and instead of running multiple
|
|
||||||
# tasks (which will result in multiples of the same msg being
|
|
||||||
# relayed for each EMS client) we just register each client
|
|
||||||
# stream to this single relay loop using _router.dialogues
|
|
||||||
|
|
||||||
# begin processing order events from the target brokerd backend
|
|
||||||
# by receiving order submission response messages,
|
|
||||||
# normalizing them to EMS messages and relaying back to
|
|
||||||
# the piker order client set.
|
|
||||||
|
|
||||||
n.start_soon(
|
|
||||||
|
|
||||||
translate_and_relay_brokerd_events,
|
|
||||||
|
|
||||||
broker,
|
|
||||||
# ems_client_order_stream,
|
|
||||||
brokerd_trades_stream,
|
|
||||||
dark_book,
|
|
||||||
_router,
|
|
||||||
)
|
|
||||||
|
|
||||||
_router.relays[broker] = (positions, brokerd_trades_stream)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield positions, brokerd_trades_stream
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# remove from cache so next client will respawn if needed
|
|
||||||
_router.relays.pop(broker)
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def _emsd_main(
|
async def _emsd_main(
|
||||||
|
|
||||||
|
@ -855,7 +942,7 @@ async def _emsd_main(
|
||||||
run (dark order) conditions on inputs and trigger brokerd "live"
|
run (dark order) conditions on inputs and trigger brokerd "live"
|
||||||
order submissions.
|
order submissions.
|
||||||
|
|
|
|
||||||
- ``translate_and_relay_brokerd_events()``:
|
- (maybe) ``translate_and_relay_brokerd_events()``:
|
||||||
accept normalized trades responses from brokerd, process and
|
accept normalized trades responses from brokerd, process and
|
||||||
relay to ems client(s); this is a effectively a "trade event
|
relay to ems client(s); this is a effectively a "trade event
|
||||||
reponse" proxy-broker.
|
reponse" proxy-broker.
|
||||||
|
@ -905,24 +992,24 @@ async def _emsd_main(
|
||||||
|
|
||||||
# only open if one isn't already up: we try to keep
|
# only open if one isn't already up: we try to keep
|
||||||
# as few duplicate streams as necessary
|
# as few duplicate streams as necessary
|
||||||
maybe_open_brokerd_trades_dialogue(
|
_router.maybe_open_brokerd_trades_dialogue(
|
||||||
_router,
|
|
||||||
feed,
|
feed,
|
||||||
broker,
|
|
||||||
symbol,
|
symbol,
|
||||||
dark_book,
|
dark_book,
|
||||||
_exec_mode,
|
_exec_mode,
|
||||||
loglevel,
|
loglevel,
|
||||||
|
|
||||||
) as (positions, brokerd_trades_stream),
|
) as relay,
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
|
|
||||||
|
brokerd_stream = relay.brokerd_dialogue # .clone()
|
||||||
|
|
||||||
# signal to client that we're started
|
# signal to client that we're started
|
||||||
# TODO: we could eventually send back **all** brokerd
|
# TODO: we could eventually send back **all** brokerd
|
||||||
# positions here?
|
# positions here?
|
||||||
await ems_ctx.started(positions)
|
await ems_ctx.started(relay.positions)
|
||||||
|
|
||||||
# establish 2-way stream with requesting order-client and
|
# establish 2-way stream with requesting order-client and
|
||||||
# begin handling inbound order requests and updates
|
# begin handling inbound order requests and updates
|
||||||
|
@ -932,7 +1019,8 @@ async def _emsd_main(
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
clear_dark_triggers,
|
clear_dark_triggers,
|
||||||
|
|
||||||
brokerd_trades_stream,
|
# relay.brokerd_dialogue,
|
||||||
|
brokerd_stream,
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
feed.stream,
|
feed.stream,
|
||||||
|
|
||||||
|
@ -942,12 +1030,41 @@ async def _emsd_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
# start inbound (from attached client) order request processing
|
# start inbound (from attached client) order request processing
|
||||||
|
try:
|
||||||
|
_router.clients.add(ems_client_order_stream)
|
||||||
|
|
||||||
await process_client_order_cmds(
|
await process_client_order_cmds(
|
||||||
|
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
brokerd_trades_stream,
|
|
||||||
|
# relay.brokerd_dialogue,
|
||||||
|
brokerd_stream,
|
||||||
|
|
||||||
symbol,
|
symbol,
|
||||||
feed,
|
feed,
|
||||||
dark_book,
|
dark_book,
|
||||||
_router,
|
_router,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# remove client from "registry"
|
||||||
|
_router.clients.remove(ems_client_order_stream)
|
||||||
|
|
||||||
|
dialogues = _router.dialogues
|
||||||
|
|
||||||
|
for oid, client_stream in dialogues.items():
|
||||||
|
|
||||||
|
if client_stream == ems_client_order_stream:
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
f'client dialogue is being abandoned:\n'
|
||||||
|
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
||||||
|
)
|
||||||
|
dialogues.pop(oid)
|
||||||
|
|
||||||
|
# TODO: for order dialogues left "alive" in
|
||||||
|
# the ems this is where we should allow some
|
||||||
|
# system to take over management. Likely we
|
||||||
|
# want to allow the user to choose what kind
|
||||||
|
# of policy to use (eg. cancel all orders
|
||||||
|
# from client, run some algo, etc.).
|
||||||
|
|
|
@ -233,12 +233,13 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
for (stream, tick_throttle) in subs:
|
for (stream, tick_throttle) in subs:
|
||||||
|
|
||||||
|
try:
|
||||||
if tick_throttle:
|
if tick_throttle:
|
||||||
await stream.send(quote)
|
await stream.send(quote)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
try:
|
|
||||||
await stream.send({sym: quote})
|
await stream.send({sym: quote})
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError
|
||||||
|
@ -247,13 +248,19 @@ async def sample_and_broadcast(
|
||||||
# if it's done in the fee bus code?
|
# if it's done in the fee bus code?
|
||||||
# so far seems like no since this should all
|
# so far seems like no since this should all
|
||||||
# be single-threaded.
|
# be single-threaded.
|
||||||
log.error(f'{stream._ctx.chan.uid} dropped connection')
|
log.warning(
|
||||||
|
f'{stream._ctx.chan.uid} dropped '
|
||||||
|
'`brokerd`-quotes-feed connection'
|
||||||
|
)
|
||||||
|
subs.remove((stream, tick_throttle))
|
||||||
|
|
||||||
|
|
||||||
async def uniform_rate_send(
|
async def uniform_rate_send(
|
||||||
|
|
||||||
rate: float,
|
rate: float,
|
||||||
quote_stream: trio.abc.ReceiveChannel,
|
quote_stream: trio.abc.ReceiveChannel,
|
||||||
stream: tractor.MsgStream,
|
stream: tractor.MsgStream,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
sleep_period = 1/rate - 0.000616
|
sleep_period = 1/rate - 0.000616
|
||||||
|
@ -289,8 +296,14 @@ async def uniform_rate_send(
|
||||||
|
|
||||||
# TODO: now if only we could sync this to the display
|
# TODO: now if only we could sync this to the display
|
||||||
# rate timing exactly lul
|
# rate timing exactly lul
|
||||||
|
try:
|
||||||
await stream.send({first_quote['symbol']: first_quote})
|
await stream.send({first_quote['symbol']: first_quote})
|
||||||
break
|
break
|
||||||
|
except trio.ClosedResourceError:
|
||||||
|
# if the feed consumer goes down then drop
|
||||||
|
# out of this rate limiter
|
||||||
|
log.warning(f'{stream} closed')
|
||||||
|
return
|
||||||
|
|
||||||
end = time.time()
|
end = time.time()
|
||||||
diff = end - start
|
diff = end - start
|
||||||
|
|
|
@ -305,6 +305,11 @@ async def attach_feed_bus(
|
||||||
):
|
):
|
||||||
|
|
||||||
if tick_throttle:
|
if tick_throttle:
|
||||||
|
|
||||||
|
# open a bg task which receives quotes over a mem chan
|
||||||
|
# and only pushes them to the target actor-consumer at
|
||||||
|
# a max ``tick_throttle`` instantaneous rate.
|
||||||
|
|
||||||
send, recv = trio.open_memory_channel(2**10)
|
send, recv = trio.open_memory_channel(2**10)
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
|
@ -321,7 +326,12 @@ async def attach_feed_bus(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
log.info(
|
||||||
|
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
|
||||||
|
if tick_throttle:
|
||||||
|
n.cancel_scope.cancel()
|
||||||
bus._subscribers[symbol].remove(sub)
|
bus._subscribers[symbol].remove(sub)
|
||||||
|
|
||||||
|
|
||||||
|
@ -473,11 +483,6 @@ async def open_feed(
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
# TODO: can we make this work better with the proposed
|
|
||||||
# context based bidirectional streaming style api proposed in:
|
|
||||||
# https://github.com/goodboy/tractor/issues/53
|
|
||||||
# init_msg = await stream.receive()
|
|
||||||
|
|
||||||
# we can only read from shm
|
# we can only read from shm
|
||||||
shm = attach_shm_array(
|
shm = attach_shm_array(
|
||||||
token=init_msg[sym]['shm_token'],
|
token=init_msg[sym]['shm_token'],
|
||||||
|
@ -520,4 +525,8 @@ async def open_feed(
|
||||||
|
|
||||||
feed._max_sample_rate = max(ohlc_sample_rates)
|
feed._max_sample_rate = max(ohlc_sample_rates)
|
||||||
|
|
||||||
|
try:
|
||||||
yield feed
|
yield feed
|
||||||
|
finally:
|
||||||
|
# drop the infinite stream connection
|
||||||
|
await ctx.cancel()
|
||||||
|
|
|
@ -19,7 +19,6 @@ High level Qt chart widgets.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import time
|
import time
|
||||||
from contextlib import AsyncExitStack
|
|
||||||
from typing import Tuple, Dict, Any, Optional
|
from typing import Tuple, Dict, Any, Optional
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
@ -844,7 +843,7 @@ class ChartPlotWidget(pg.PlotWidget):
|
||||||
# istart=max(lbar, l), iend=min(rbar, r), just_history=True)
|
# istart=max(lbar, l), iend=min(rbar, r), just_history=True)
|
||||||
|
|
||||||
# bars_len = rbar - lbar
|
# bars_len = rbar - lbar
|
||||||
# log.trace(
|
# log.debug(
|
||||||
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
|
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
|
||||||
# f"view_len: {view_len}, bars_len: {bars_len}\n"
|
# f"view_len: {view_len}, bars_len: {bars_len}\n"
|
||||||
# f"begin: {begin}, end: {end}, extra: {extra}"
|
# f"begin: {begin}, end: {end}, extra: {extra}"
|
||||||
|
@ -1474,7 +1473,6 @@ async def display_symbol_data(
|
||||||
|
|
||||||
) as feed,
|
) as feed,
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
|
||||||
):
|
):
|
||||||
|
|
||||||
ohlcv: ShmArray = feed.shm
|
ohlcv: ShmArray = feed.shm
|
||||||
|
@ -1542,6 +1540,7 @@ async def display_symbol_data(
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
# load initial fsp chain (otherwise known as "indicators")
|
# load initial fsp chain (otherwise known as "indicators")
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
spawn_fsps,
|
spawn_fsps,
|
||||||
|
@ -1578,41 +1577,29 @@ async def display_symbol_data(
|
||||||
await start_order_mode(chart, symbol, provider, order_mode_started)
|
await start_order_mode(chart, symbol, provider, order_mode_started)
|
||||||
|
|
||||||
|
|
||||||
async def load_providers(
|
async def load_provider_search(
|
||||||
|
|
||||||
brokernames: list[str],
|
broker: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# TODO: seems like our incentive for brokerd caching lelel
|
|
||||||
backends = {}
|
|
||||||
|
|
||||||
async with AsyncExitStack() as stack:
|
|
||||||
# TODO: spawn these async in nursery.
|
|
||||||
# load all requested brokerd's at startup and load their
|
|
||||||
# search engines.
|
|
||||||
for broker in brokernames:
|
|
||||||
|
|
||||||
log.info(f'loading brokerd for {broker}..')
|
log.info(f'loading brokerd for {broker}..')
|
||||||
# spin up broker daemons for each provider
|
|
||||||
portal = await stack.enter_async_context(
|
async with (
|
||||||
|
|
||||||
maybe_spawn_brokerd(
|
maybe_spawn_brokerd(
|
||||||
broker,
|
broker,
|
||||||
loglevel=loglevel
|
loglevel=loglevel
|
||||||
)
|
) as portal,
|
||||||
)
|
|
||||||
|
|
||||||
backends[broker] = portal
|
|
||||||
|
|
||||||
await stack.enter_async_context(
|
|
||||||
feed.install_brokerd_search(
|
feed.install_brokerd_search(
|
||||||
portal,
|
portal,
|
||||||
get_brokermod(broker),
|
get_brokermod(broker),
|
||||||
)
|
),
|
||||||
)
|
):
|
||||||
|
|
||||||
# keep search engines up until cancelled
|
# keep search engine stream up until cancelled
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@ -1653,9 +1640,7 @@ async def _async_main(
|
||||||
sbar = godwidget.window.status_bar
|
sbar = godwidget.window.status_bar
|
||||||
starting_done = sbar.open_status('starting ze sexy chartz')
|
starting_done = sbar.open_status('starting ze sexy chartz')
|
||||||
|
|
||||||
async with (
|
async with trio.open_nursery() as root_n:
|
||||||
trio.open_nursery() as root_n,
|
|
||||||
):
|
|
||||||
|
|
||||||
# set root nursery and task stack for spawning other charts/feeds
|
# set root nursery and task stack for spawning other charts/feeds
|
||||||
# that run cached in the bg
|
# that run cached in the bg
|
||||||
|
@ -1694,7 +1679,8 @@ async def _async_main(
|
||||||
):
|
):
|
||||||
# load other providers into search **after**
|
# load other providers into search **after**
|
||||||
# the chart's select cache
|
# the chart's select cache
|
||||||
root_n.start_soon(load_providers, brokernames, loglevel)
|
for broker in brokernames:
|
||||||
|
root_n.start_soon(load_provider_search, broker, loglevel)
|
||||||
|
|
||||||
await order_mode_ready.wait()
|
await order_mode_ready.wait()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue