Merge pull request #295 from pikers/fqsns

Fqsns for cross-broker ticker naming
no_git_prot_w_pip
goodboy 2022-04-11 09:20:36 -04:00 committed by GitHub
commit 253cbf901c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 506 additions and 304 deletions

View File

@ -18,8 +18,11 @@
Binance backend Binance backend
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator from typing import (
Any, Union, Optional,
AsyncGenerator, Callable,
)
import time import time
import trio import trio
@ -88,7 +91,7 @@ class Pair(BaseModel):
baseCommissionPrecision: int baseCommissionPrecision: int
quoteCommissionPrecision: int quoteCommissionPrecision: int
orderTypes: List[str] orderTypes: list[str]
icebergAllowed: bool icebergAllowed: bool
ocoAllowed: bool ocoAllowed: bool
@ -96,8 +99,8 @@ class Pair(BaseModel):
isSpotTradingAllowed: bool isSpotTradingAllowed: bool
isMarginTradingAllowed: bool isMarginTradingAllowed: bool
filters: List[Dict[str, Union[str, int, float]]] filters: list[dict[str, Union[str, int, float]]]
permissions: List[str] permissions: list[str]
@dataclass @dataclass
@ -145,7 +148,7 @@ class Client:
self, self,
method: str, method: str,
params: dict, params: dict,
) -> Dict[str, Any]: ) -> dict[str, Any]:
resp = await self._sesh.get( resp = await self._sesh.get(
path=f'/api/v3/{method}', path=f'/api/v3/{method}',
params=params, params=params,
@ -200,7 +203,7 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = None,
) -> Dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None: if self._pairs is not None:
data = self._pairs data = self._pairs
else: else:
@ -273,7 +276,7 @@ class Client:
return array return array
@asynccontextmanager @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
await client.cache_symbols() await client.cache_symbols()
@ -353,7 +356,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
} }
def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
"""Create a request subscription packet dict. """Create a request subscription packet dict.
https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
@ -368,6 +371,17 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
} }
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client:
yield client
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
@ -385,12 +399,12 @@ async def backfill_bars(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
@ -427,10 +441,11 @@ async def stream_quotes(
symbol: { symbol: {
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}, },
} }
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# setup subs # setup subs
@ -480,8 +495,7 @@ async def stream_quotes(
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__() typ, quote = await msg_gen.__anext__()
first_quote = {quote['symbol'].lower(): quote} task_status.started((init_msgs, quote))
task_status.started((init_msgs, first_quote))
# signal to caller feed is ready for consumption # signal to caller feed is ready for consumption
feed_is_live.set() feed_is_live.set()

View File

@ -1472,6 +1472,7 @@ async def stream_quotes(
return init_msgs return init_msgs
init_msgs = mk_init_msgs() init_msgs = mk_init_msgs()
con = first_ticker.contract con = first_ticker.contract
# should be real volume for this contract by default # should be real volume for this contract by default
@ -1496,8 +1497,11 @@ async def stream_quotes(
topic = '.'.join((con['symbol'], suffix)).lower() topic = '.'.join((con['symbol'], suffix)).lower()
quote['symbol'] = topic quote['symbol'] = topic
# for compat with upcoming fqsn based derivs search
init_msgs[sym]['fqsn'] = topic
# pass first quote asap # pass first quote asap
first_quote = {topic: quote} first_quote = quote
# it might be outside regular trading hours so see if we can at # it might be outside regular trading hours so see if we can at
# least grab history. # least grab history.

View File

@ -18,9 +18,9 @@
Kraken backend. Kraken backend.
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import Dict, List, Tuple, Any, Optional, AsyncIterator from typing import Any, Optional, AsyncIterator, Callable
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -80,7 +80,7 @@ ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True _show_wap_in_history = True
_symbol_info_translation: Dict[str, str] = { _symbol_info_translation: dict[str, str] = {
'tick_decimals': 'pair_decimals', 'tick_decimals': 'pair_decimals',
} }
@ -102,16 +102,16 @@ class Pair(BaseModel):
lot_multiplier: float lot_multiplier: float
# array of leverage amounts available when buying # array of leverage amounts available when buying
leverage_buy: List[int] leverage_buy: list[int]
# array of leverage amounts available when selling # array of leverage amounts available when selling
leverage_sell: List[int] leverage_sell: list[int]
# fee schedule array in [volume, percent fee] tuples # fee schedule array in [volume, percent fee] tuples
fees: List[Tuple[int, float]] fees: list[tuple[int, float]]
# maker fee schedule array in [volume, percent fee] tuples (if on # maker fee schedule array in [volume, percent fee] tuples (if on
# maker/taker) # maker/taker)
fees_maker: List[Tuple[int, float]] fees_maker: list[tuple[int, float]]
fee_volume_currency: str # volume discount currency fee_volume_currency: str # volume discount currency
margin_call: str # margin call level margin_call: str # margin call level
@ -153,7 +153,7 @@ class OHLC:
volume: float # Accumulated volume **within interval** volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval count: int # Number of trades within interval
# (sampled) generated tick data # (sampled) generated tick data
ticks: List[Any] = field(default_factory=list) ticks: list[Any] = field(default_factory=list)
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
@ -177,7 +177,7 @@ def get_config() -> dict[str, Any]:
def get_kraken_signature( def get_kraken_signature(
urlpath: str, urlpath: str,
data: Dict[str, Any], data: dict[str, Any],
secret: str secret: str
) -> str: ) -> str:
postdata = urllib.parse.urlencode(data) postdata = urllib.parse.urlencode(data)
@ -220,7 +220,7 @@ class Client:
self._secret = secret self._secret = secret
@property @property
def pairs(self) -> Dict[str, Any]: def pairs(self) -> dict[str, Any]:
if self._pairs is None: if self._pairs is None:
raise RuntimeError( raise RuntimeError(
"Make sure to run `cache_symbols()` on startup!" "Make sure to run `cache_symbols()` on startup!"
@ -233,7 +233,7 @@ class Client:
self, self,
method: str, method: str,
data: dict, data: dict,
) -> Dict[str, Any]: ) -> dict[str, Any]:
resp = await self._sesh.post( resp = await self._sesh.post(
path=f'/public/{method}', path=f'/public/{method}',
json=data, json=data,
@ -246,7 +246,7 @@ class Client:
method: str, method: str,
data: dict, data: dict,
uri_path: str uri_path: str
) -> Dict[str, Any]: ) -> dict[str, Any]:
headers = { headers = {
'Content-Type': 'Content-Type':
'application/x-www-form-urlencoded', 'application/x-www-form-urlencoded',
@ -266,16 +266,16 @@ class Client:
async def endpoint( async def endpoint(
self, self,
method: str, method: str,
data: Dict[str, Any] data: dict[str, Any]
) -> Dict[str, Any]: ) -> dict[str, Any]:
uri_path = f'/0/private/{method}' uri_path = f'/0/private/{method}'
data['nonce'] = str(int(1000*time.time())) data['nonce'] = str(int(1000*time.time()))
return await self._private(method, data, uri_path) return await self._private(method, data, uri_path)
async def get_trades( async def get_trades(
self, self,
data: Dict[str, Any] = {} data: dict[str, Any] = {}
) -> Dict[str, Any]: ) -> dict[str, Any]:
data['ofs'] = 0 data['ofs'] = 0
# Grab all trade history # Grab all trade history
# https://docs.kraken.com/rest/#operation/getTradeHistory # https://docs.kraken.com/rest/#operation/getTradeHistory
@ -378,7 +378,7 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = None,
) -> Dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None: if self._pairs is not None:
data = self._pairs data = self._pairs
else: else:
@ -452,7 +452,7 @@ class Client:
raise SymbolNotFound(json['error'][0] + f': {symbol}') raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager @acm
async def get_client() -> Client: async def get_client() -> Client:
section = get_config() section = get_config()
@ -521,7 +521,7 @@ def normalize_symbol(
return ticker.lower() return ticker.lower()
def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: def make_auth_sub(data: dict[str, Any]) -> dict[str, str]:
''' '''
Create a request subscription packet dict. Create a request subscription packet dict.
@ -696,12 +696,12 @@ async def handle_order_requests(
async def trades_dialogue( async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
loglevel: str = None, loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection, token: str): async def subscribe(ws: wsproto.WSConnection, token: str):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
@ -980,7 +980,7 @@ def normalize(
return topic, quote return topic, quote
def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]:
''' '''
Create a request subscription packet dict. Create a request subscription packet dict.
@ -996,6 +996,17 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
yield client
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
@ -1017,7 +1028,7 @@ async def backfill_bars(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
@ -1025,7 +1036,7 @@ async def stream_quotes(
sub_type: str = 'ohlc', sub_type: str = 'ohlc',
# startup sync # startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -1064,10 +1075,11 @@ async def stream_quotes(
symbol: { symbol: {
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
}, },
} }
@asynccontextmanager @acm
async def subscribe(ws: wsproto.WSConnection): async def subscribe(ws: wsproto.WSConnection):
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
@ -1121,8 +1133,7 @@ async def stream_quotes(
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
first_quote = {topic: quote} task_status.started((init_msgs, quote))
task_status.started((init_msgs, first_quote))
# lol, only "closes" when they're margin squeezing clients ;P # lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set() feed_is_live.set()

View File

@ -18,7 +18,7 @@
Orders and execution client API. Orders and execution client API.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from typing import Dict from typing import Dict
from pprint import pformat from pprint import pformat
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -27,7 +27,6 @@ import trio
import tractor import tractor
from tractor.trionics import broadcast_receiver from tractor.trionics import broadcast_receiver
from ..data._source import Symbol
from ..log import get_logger from ..log import get_logger
from ._ems import _emsd_main from ._ems import _emsd_main
from .._daemon import maybe_open_emsd from .._daemon import maybe_open_emsd
@ -156,16 +155,19 @@ async def relay_order_cmds_from_sync_code(
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)
@asynccontextmanager @acm
async def open_ems( async def open_ems(
broker: str, fqsn: str,
symbol: Symbol,
) -> (OrderBook, tractor.MsgStream, dict): ) -> (
"""Spawn an EMS daemon and begin sending orders and receiving OrderBook,
tractor.MsgStream,
dict,
):
'''
Spawn an EMS daemon and begin sending orders and receiving
alerts. alerts.
This EMS tries to reduce most broker's terrible order entry apis to This EMS tries to reduce most broker's terrible order entry apis to
a very simple protocol built on a few easy to grok and/or a very simple protocol built on a few easy to grok and/or
"rantsy" premises: "rantsy" premises:
@ -194,21 +196,22 @@ async def open_ems(
- 'dark_executed', 'broker_executed' - 'dark_executed', 'broker_executed'
- 'broker_filled' - 'broker_filled'
""" '''
# wait for service to connect back to us signalling # wait for service to connect back to us signalling
# ready for order commands # ready for order commands
book = get_orders() book = get_orders()
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn)
async with maybe_open_emsd(broker) as portal: async with maybe_open_emsd(broker) as portal:
async with ( async with (
# connect to emsd # connect to emsd
portal.open_context( portal.open_context(
_emsd_main, _emsd_main,
broker=broker, fqsn=fqsn,
symbol=symbol.key,
) as (ctx, (positions, accounts)), ) as (ctx, (positions, accounts)),
@ -218,7 +221,7 @@ async def open_ems(
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
relay_order_cmds_from_sync_code, relay_order_cmds_from_sync_code,
symbol.key, fqsn,
trades_stream trades_stream
) )

View File

@ -20,7 +20,6 @@ In da suit parlances: "Execution management systems"
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from math import isnan
from pprint import pformat from pprint import pformat
import time import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
@ -113,8 +112,8 @@ class _DarkBook:
# tracks most recent values per symbol each from data feed # tracks most recent values per symbol each from data feed
lasts: dict[ lasts: dict[
tuple[str, str], str,
float float,
] = field(default_factory=dict) ] = field(default_factory=dict)
# mapping of piker ems order ids to current brokerd order flow message # mapping of piker ems order ids to current brokerd order flow message
@ -135,7 +134,7 @@ async def clear_dark_triggers(
ems_client_order_stream: tractor.MsgStream, ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str, broker: str,
symbol: str, fqsn: str,
book: _DarkBook, book: _DarkBook,
@ -155,7 +154,6 @@ async def clear_dark_triggers(
# start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():
execs = book.orders.get(sym, {}) execs = book.orders.get(sym, {})
for tick in iterticks( for tick in iterticks(
quote, quote,
# dark order price filter(s) # dark order price filter(s)
@ -171,7 +169,7 @@ async def clear_dark_triggers(
ttype = tick['type'] ttype = tick['type']
# update to keep new cmds informed # update to keep new cmds informed
book.lasts[(broker, symbol)] = price book.lasts[sym] = price
for oid, ( for oid, (
pred, pred,
@ -196,6 +194,7 @@ async def clear_dark_triggers(
action: str = cmd['action'] action: str = cmd['action']
symbol: str = cmd['symbol'] symbol: str = cmd['symbol']
bfqsn: str = symbol.replace(f'.{broker}', '')
if action == 'alert': if action == 'alert':
# nothing to do but relay a status # nothing to do but relay a status
@ -225,7 +224,7 @@ async def clear_dark_triggers(
# order-request and instead create a new one. # order-request and instead create a new one.
reqid=None, reqid=None,
symbol=sym, symbol=bfqsn,
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
@ -247,12 +246,9 @@ async def clear_dark_triggers(
oid=oid, # ems order id oid=oid, # ems order id
resp=resp, resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=fqsn,
symbol=symbol,
trigger_price=price, trigger_price=price,
broker_details={'name': broker}, broker_details={'name': broker},
cmd=cmd, # original request message cmd=cmd, # original request message
).dict() ).dict()
@ -270,7 +266,7 @@ async def clear_dark_triggers(
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
if execs: if execs:
book.orders[symbol] = execs book.orders[fqsn] = execs
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@ -382,7 +378,8 @@ async def open_brokerd_trades_dialogue(
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]: ) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none '''
Open and yield ``brokerd`` trades dialogue context-stream if none
already exists. already exists.
''' '''
@ -419,8 +416,7 @@ async def open_brokerd_trades_dialogue(
# actor to simulate the real IPC load it'll have when also # actor to simulate the real IPC load it'll have when also
# pulling data from feeds # pulling data from feeds
open_trades_endpoint = paper.open_paperboi( open_trades_endpoint = paper.open_paperboi(
broker=broker, fqsn='.'.join([symbol, broker]),
symbol=symbol,
loglevel=loglevel, loglevel=loglevel,
) )
@ -458,12 +454,13 @@ async def open_brokerd_trades_dialogue(
# locally cache and track positions per account. # locally cache and track positions per account.
pps = {} pps = {}
for msg in positions: for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account'] account = msg['account']
assert account in accounts assert account in accounts
pps.setdefault( pps.setdefault(
msg['symbol'], f'{msg["symbol"]}.{broker}',
{} {}
)[account] = msg )[account] = msg
@ -493,8 +490,9 @@ async def open_brokerd_trades_dialogue(
finally: finally:
# parent context must have been closed # parent context must have been closed
# remove from cache so next client will respawn if needed # remove from cache so next client will respawn if needed
## TODO: Maybe add a warning relay = _router.relays.pop(broker, None)
_router.relays.pop(broker, None) if not relay:
log.warning(f'Relay for {broker} was already removed!?')
@tractor.context @tractor.context
@ -563,7 +561,13 @@ async def translate_and_relay_brokerd_events(
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
relay.positions.setdefault(pos_msg['symbol'], {}).setdefault( sym, broker = pos_msg['symbol'], pos_msg['broker']
relay.positions.setdefault(
# NOTE: translate to a FQSN!
f'{sym}.{broker}',
{}
).setdefault(
pos_msg['account'], {} pos_msg['account'], {}
).update(pos_msg) ).update(pos_msg)
@ -840,11 +844,15 @@ async def process_client_order_cmds(
msg = Order(**cmd) msg = Order(**cmd)
sym = msg.symbol fqsn = msg.symbol
trigger_price = msg.price trigger_price = msg.price
size = msg.size size = msg.size
exec_mode = msg.exec_mode exec_mode = msg.exec_mode
broker = msg.brokers[0] broker = msg.brokers[0]
# remove the broker part before creating a message
# to send to the specific broker since they probably
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if exec_mode == 'live' and action in ('buy', 'sell',): if exec_mode == 'live' and action in ('buy', 'sell',):
@ -902,7 +910,7 @@ async def process_client_order_cmds(
# price received from the feed, instead of being # price received from the feed, instead of being
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
last = dark_book.lasts[(broker, sym)] last = dark_book.lasts[fqsn]
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
@ -933,7 +941,7 @@ async def process_client_order_cmds(
# dark book entry if the order id already exists # dark book entry if the order id already exists
dark_book.orders.setdefault( dark_book.orders.setdefault(
sym, {} fqsn, {}
)[oid] = ( )[oid] = (
pred, pred,
tickfilter, tickfilter,
@ -960,8 +968,8 @@ async def process_client_order_cmds(
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
broker: str, fqsn: str,
symbol: str,
_exec_mode: str = 'dark', # ('paper', 'dark', 'live') _exec_mode: str = 'dark', # ('paper', 'dark', 'live')
loglevel: str = 'info', loglevel: str = 'info',
@ -1003,6 +1011,8 @@ async def _emsd_main(
global _router global _router
assert _router assert _router
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn)
dark_book = _router.get_dark_book(broker) dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg, # TODO: would be nice if in tractor we can require either a ctx arg,
@ -1015,22 +1025,16 @@ async def _emsd_main(
# spawn one task per broker feed # spawn one task per broker feed
async with ( async with (
maybe_open_feed( maybe_open_feed(
broker, [fqsn],
[symbol],
loglevel=loglevel, loglevel=loglevel,
) as (feed, stream), ) as (feed, quote_stream),
): ):
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[symbol] first_quote = feed.first_quotes[fqsn]
book = _router.get_dark_book(broker) book = _router.get_dark_book(broker)
last = book.lasts[(broker, symbol)] = first_quote['last'] book.lasts[fqsn] = first_quote['last']
# XXX: ib is a cucker but we've fixed avoiding receiving any
# `Nan`s in the backend during market hours (right?). this was
# here previously as a sanity check during market hours.
# assert not isnan(last)
# open a stream with the brokerd backend for order # open a stream with the brokerd backend for order
# flow dialogue # flow dialogue
@ -1054,8 +1058,8 @@ async def _emsd_main(
# flatten out collected pps from brokerd for delivery # flatten out collected pps from brokerd for delivery
pp_msgs = { pp_msgs = {
sym: list(pps.values()) fqsn: list(pps.values())
for sym, pps in relay.positions.items() for fqsn, pps in relay.positions.items()
} }
# signal to client that we're started and deliver # signal to client that we're started and deliver
@ -1072,9 +1076,9 @@ async def _emsd_main(
brokerd_stream, brokerd_stream,
ems_client_order_stream, ems_client_order_stream,
stream, quote_stream,
broker, broker,
symbol, fqsn, # form: <name>.<venue>.<suffix>.<broker>
book book
) )
@ -1090,7 +1094,7 @@ async def _emsd_main(
# relay.brokerd_dialogue, # relay.brokerd_dialogue,
brokerd_stream, brokerd_stream,
symbol, fqsn,
feed, feed,
dark_book, dark_book,
_router, _router,

View File

@ -32,6 +32,7 @@ from dataclasses import dataclass
from .. import data from .. import data
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
from ..log import get_logger from ..log import get_logger
from ._messages import ( from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
@ -446,7 +447,7 @@ async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
broker: str, broker: str,
symbol: str, fqsn: str,
loglevel: str = None, loglevel: str = None,
) -> None: ) -> None:
@ -455,8 +456,7 @@ async def trades_dialogue(
async with ( async with (
data.open_feed( data.open_feed(
broker, [fqsn],
[symbol],
loglevel=loglevel, loglevel=loglevel,
) as feed, ) as feed,
@ -491,15 +491,16 @@ async def trades_dialogue(
@asynccontextmanager @asynccontextmanager
async def open_paperboi( async def open_paperboi(
broker: str, fqsn: str,
symbol: str,
loglevel: str, loglevel: str,
) -> Callable: ) -> Callable:
'''Spawn a paper engine actor and yield through access to '''
Spawn a paper engine actor and yield through access to
its context. its context.
''' '''
broker, symbol, expiry = unpack_fqsn(fqsn)
service_name = f'paperboi.{broker}' service_name = f'paperboi.{broker}'
async with ( async with (
@ -518,7 +519,7 @@ async def open_paperboi(
async with portal.open_context( async with portal.open_context(
trades_dialogue, trades_dialogue,
broker=broker, broker=broker,
symbol=symbol, fqsn=fqsn,
loglevel=loglevel, loglevel=loglevel,
) as (ctx, first): ) as (ctx, first):

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -19,6 +19,8 @@ Sampling and broadcast machinery for (soft) real-time delivery of
financial data flows. financial data flows.
""" """
from __future__ import annotations
from collections import Counter
import time import time
import tractor import tractor
@ -133,18 +135,20 @@ async def increment_ohlc_buffer(
# a given sample period. # a given sample period.
subs = sampler.subscribers.get(delay_s, ()) subs = sampler.subscribers.get(delay_s, ())
for ctx in subs: for stream in subs:
try: try:
await ctx.send_yield({'index': shm._last.value}) await stream.send({'index': shm._last.value})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
): ):
log.error(f'{ctx.chan.uid} dropped connection') log.error(
subs.remove(ctx) f'{stream._ctx.chan.uid} dropped connection'
)
subs.remove(stream)
@tractor.stream @tractor.context
async def iter_ohlc_periods( async def iter_ohlc_periods(
ctx: tractor.Context, ctx: tractor.Context,
delay_s: int, delay_s: int,
@ -158,18 +162,20 @@ async def iter_ohlc_periods(
''' '''
# add our subscription # add our subscription
subs = sampler.subscribers.setdefault(delay_s, []) subs = sampler.subscribers.setdefault(delay_s, [])
subs.append(ctx) await ctx.started()
async with ctx.open_stream() as stream:
subs.append(stream)
try:
# stream and block until cancelled
await trio.sleep_forever()
finally:
try: try:
subs.remove(ctx) # stream and block until cancelled
except ValueError: await trio.sleep_forever()
log.error( finally:
f'iOHLC step stream was already dropped for {ctx.chan.uid}?' try:
) subs.remove(stream)
except ValueError:
log.error(
f'iOHLC step stream was already dropped {ctx.chan.uid}?'
)
async def sample_and_broadcast( async def sample_and_broadcast(
@ -177,17 +183,19 @@ async def sample_and_broadcast(
bus: '_FeedsBus', # noqa bus: '_FeedsBus', # noqa
shm: ShmArray, shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
brokername: str,
sum_tick_vlm: bool = True, sum_tick_vlm: bool = True,
) -> None: ) -> None:
log.info("Started shared mem bar writer") log.info("Started shared mem bar writer")
overruns = Counter()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# TODO: ``numba`` this! # TODO: ``numba`` this!
for sym, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
# will require at least some way to prevent task switching # will require at least some way to prevent task switching
@ -251,9 +259,15 @@ async def sample_and_broadcast(
# end up triggering backpressure which which will # end up triggering backpressure which which will
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
subs = bus._subscribers[sym.lower()] subs = bus._subscribers[broker_symbol.lower()]
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqsn schema (but maybe it
# should?) so we have to manually generate the correct
# key here.
bsym = f'{broker_symbol}.{brokername}'
lags: int = 0
lags = 0
for (stream, tick_throttle) in subs: for (stream, tick_throttle) in subs:
try: try:
@ -262,7 +276,9 @@ async def sample_and_broadcast(
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
try: try:
stream.send_nowait((sym, quote)) stream.send_nowait(
(bsym, quote)
)
except trio.WouldBlock: except trio.WouldBlock:
ctx = getattr(stream, '_ctx', None) ctx = getattr(stream, '_ctx', None)
if ctx: if ctx:
@ -271,12 +287,22 @@ async def sample_and_broadcast(
f'{ctx.channel.uid} !!!' f'{ctx.channel.uid} !!!'
) )
else: else:
key = id(stream)
overruns[key] += 1
log.warning( log.warning(
f'Feed overrun {bus.brokername} -> ' f'Feed overrun {bus.brokername} -> '
f'feed @ {tick_throttle} Hz' f'feed @ {tick_throttle} Hz'
) )
if overruns[key] > 6:
log.warning(
f'Dropping consumer {stream}'
)
await stream.aclose()
raise trio.BrokenResourceError
else: else:
await stream.send({sym: quote}) await stream.send(
{bsym: quote}
)
if cs.cancelled_caught: if cs.cancelled_caught:
lags += 1 lags += 1
@ -295,7 +321,7 @@ async def sample_and_broadcast(
'`brokerd`-quotes-feed connection' '`brokerd`-quotes-feed connection'
) )
if tick_throttle: if tick_throttle:
assert stream.closed() assert stream._closed
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -399,7 +425,16 @@ async def uniform_rate_send(
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({sym: first_quote})
except trio.ClosedResourceError: except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,
):
# if the feed consumer goes down then drop # if the feed consumer goes down then drop
# out of this rate limiter # out of this rate limiter
log.warning(f'{stream} closed') log.warning(f'{stream} closed')

View File

@ -258,7 +258,7 @@ class ShmArray:
if index < 0: if index < 0:
raise ValueError( raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n' f'Array size of {self._len} was overrun during prepend.\n'
'You have passed {abs(index)} too many datums.' f'You have passed {abs(index)} too many datums.'
) )
end = index + length end = index + length

View File

@ -17,6 +17,7 @@
""" """
numpy data source coversion helpers. numpy data source coversion helpers.
""" """
from __future__ import annotations
from typing import Any from typing import Any
import decimal import decimal
@ -91,6 +92,40 @@ def ohlc_zeros(length: int) -> np.ndarray:
return np.zeros(length, dtype=base_ohlc_dtype) return np.zeros(length, dtype=base_ohlc_dtype)
def unpack_fqsn(fqsn: str) -> tuple[str, str, str]:
'''
Unpack a fully-qualified-symbol-name to ``tuple``.
'''
venue = ''
suffix = ''
# TODO: probably reverse the order of all this XD
tokens = fqsn.split('.')
if len(tokens) < 3:
# probably crypto
symbol, broker = tokens
return (
broker,
symbol,
'',
)
elif len(tokens) > 3:
symbol, venue, suffix, broker = tokens
else:
symbol, venue, broker = tokens
suffix = ''
# head, _, broker = fqsn.rpartition('.')
# symbol, _, suffix = head.rpartition('.')
return (
broker,
'.'.join([symbol, venue]),
suffix,
)
class Symbol(BaseModel): class Symbol(BaseModel):
"""I guess this is some kinda container thing for dealing with """I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers? all the different meta-data formats from brokers?
@ -98,24 +133,72 @@ class Symbol(BaseModel):
Yah, i guess dats what it izz. Yah, i guess dats what it izz.
""" """
key: str key: str
type_key: str # {'stock', 'forex', 'future', ... etc.} tick_size: float = 0.01
tick_size: float lot_tick_size: float = 0.0 # "volume" precision as min step value
lot_tick_size: float # "volume" precision as min step value tick_size_digits: int = 2
tick_size_digits: int lot_size_digits: int = 0
lot_size_digits: int suffix: str = ''
broker_info: dict[str, dict[str, Any]] = {} broker_info: dict[str, dict[str, Any]] = {}
# specifies a "class" of financial instrument # specifies a "class" of financial instrument
# ex. stock, futer, option, bond etc. # ex. stock, futer, option, bond etc.
# @validate_arguments
@classmethod
def from_broker_info(
cls,
broker: str,
symbol: str,
info: dict[str, Any],
suffix: str = '',
# XXX: like wtf..
# ) -> 'Symbol':
) -> None:
tick_size = info.get('price_tick_size', 0.01)
lot_tick_size = info.get('lot_tick_size', 0.0)
return Symbol(
key=symbol,
tick_size=tick_size,
lot_tick_size=lot_tick_size,
tick_size_digits=float_digits(tick_size),
lot_size_digits=float_digits(lot_tick_size),
suffix=suffix,
broker_info={broker: info},
)
@classmethod
def from_fqsn(
cls,
fqsn: str,
info: dict[str, Any],
# XXX: like wtf..
# ) -> 'Symbol':
) -> None:
broker, key, suffix = unpack_fqsn(fqsn)
return cls.from_broker_info(
broker,
key,
info=info,
suffix=suffix,
)
@property
def type_key(self) -> str:
return list(self.broker_info.values())[0]['asset_type']
@property @property
def brokers(self) -> list[str]: def brokers(self) -> list[str]:
return list(self.broker_info.keys()) return list(self.broker_info.keys())
def nearest_tick(self, value: float) -> float: def nearest_tick(self, value: float) -> float:
"""Return the nearest tick value based on mininum increment. '''
Return the nearest tick value based on mininum increment.
""" '''
mult = 1 / self.tick_size mult = 1 / self.tick_size
return round(value * mult) / mult return round(value * mult) / mult
@ -131,37 +214,44 @@ class Symbol(BaseModel):
self.key, self.key,
) )
def tokens(self) -> tuple[str]:
broker, key = self.front_feed()
if self.suffix:
return (key, self.suffix, broker)
else:
return (key, broker)
def front_fqsn(self) -> str:
'''
fqsn = "fully qualified symbol name"
Basically the idea here is for all client-ish code (aka programs/actors
that ask the provider agnostic layers in the stack for data) should be
able to tell which backend / venue / derivative each data feed/flow is
from by an explicit string key of the current form:
<instrumentname>.<venue>.<suffixwithmetadata>.<brokerbackendname>
TODO: I have thoughts that we should actually change this to be
more like an "attr lookup" (like how the web should have done
urls, but marketting peeps ruined it etc. etc.):
<broker>.<venue>.<instrumentname>.<suffixwithmetadata>
'''
tokens = self.tokens()
fqsn = '.'.join(tokens)
return fqsn
def iterfqsns(self) -> list[str]: def iterfqsns(self) -> list[str]:
return [ keys = []
mk_fqsn(self.key, broker) for broker in self.broker_info.keys():
for broker in self.broker_info.keys() fqsn = mk_fqsn(self.key, broker)
] if self.suffix:
fqsn += f'.{self.suffix}'
keys.append(fqsn)
return keys
@validate_arguments
def mk_symbol(
key: str,
type_key: str,
tick_size: float = 0.01,
lot_tick_size: float = 0,
broker_info: dict[str, Any] = {},
) -> Symbol:
'''
Create and return an instrument description for the
"symbol" named as ``key``.
'''
return Symbol(
key=key,
type_key=type_key,
tick_size=tick_size,
lot_tick_size=lot_tick_size,
tick_size_digits=float_digits(tick_size),
lot_size_digits=float_digits(lot_tick_size),
broker_info=broker_info,
)
def from_df( def from_df(

View File

@ -50,9 +50,8 @@ from ._sharedmem import (
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import ( from ._source import (
base_iohlc_dtype, base_iohlc_dtype,
mk_symbol,
Symbol, Symbol,
mk_fqsn, unpack_fqsn,
) )
from ..ui import _search from ..ui import _search
from ._sampling import ( from ._sampling import (
@ -191,10 +190,8 @@ async def _setup_persistent_brokerd(
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
shm: ShmArray,
bus: _FeedsBus, bus: _FeedsBus,
symbol: str, fqsn: str,
we_opened_shm: bool,
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -208,21 +205,28 @@ async def manage_history(
buffer. buffer.
''' '''
task_status.started() # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=fqsn,
opened = we_opened_shm # use any broker defined ohlc dtype:
# TODO: history validation dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened: # we expect the sub-actor to write
# raise RuntimeError("Persistent shm for sym was already open?!") readonly=False,
)
if opened: if opened:
# ask broker backend for new history log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
# yield back after client connect with filled shm
task_status.started(shm)
# indicate to caller that feed can be delivered to # indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history # remote requesting client since we've loaded history
@ -243,13 +247,12 @@ async def manage_history(
# start shm incrementing for OHLC sampling at the current # start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne. # detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None: if sampler.incrementers.get(delay_s) is None:
cs = await bus.start_task( await bus.start_task(
increment_ohlc_buffer, increment_ohlc_buffer,
delay_s, delay_s,
) )
await trio.sleep_forever() await trio.sleep_forever()
cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
@ -279,20 +282,6 @@ async def allocate_persistent_feed(
except ImportError: except ImportError:
mod = get_ingestormod(brokername) mod = get_ingestormod(brokername)
fqsn = mk_fqsn(brokername, symbol)
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=fqsn,
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
# mem chan handed to broker backend so it can push real-time # mem chan handed to broker backend so it can push real-time
# quotes to this task for sampling and history storage (see below). # quotes to this task for sampling and history storage (see below).
send, quote_stream = trio.open_memory_channel(10) send, quote_stream = trio.open_memory_channel(10)
@ -301,30 +290,9 @@ async def allocate_persistent_feed(
some_data_ready = trio.Event() some_data_ready = trio.Event()
feed_is_live = trio.Event() feed_is_live = trio.Event()
# run 2 tasks:
# - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
await bus.nursery.start(
manage_history,
mod,
shm,
bus,
symbol,
opened,
some_data_ready,
feed_is_live,
)
# establish broker backend quote stream by calling # establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint. # ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quotes = await bus.nursery.start( init_msg, first_quote = await bus.nursery.start(
partial( partial(
mod.stream_quotes, mod.stream_quotes,
send_chan=send, send_chan=send,
@ -333,11 +301,39 @@ async def allocate_persistent_feed(
loglevel=loglevel, loglevel=loglevel,
) )
) )
# the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use.
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks:
# - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
shm = await bus.nursery.start(
manage_history,
mod,
bus,
bfqsn,
some_data_ready,
feed_is_live,
)
# we hand an IPC-msg compatible shm token to the caller so it # we hand an IPC-msg compatible shm token to the caller so it
# can read directly from the memory which will be written by # can read directly from the memory which will be written by
# this task. # this task.
init_msg[symbol]['shm_token'] = shm.token msg = init_msg[symbol]
msg['shm_token'] = shm.token
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
init_msg[fqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as # TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that # the fastest "sample period" since we'll probably always want that
@ -350,8 +346,22 @@ async def allocate_persistent_feed(
log.info(f'waiting on history to load: {fqsn}') log.info(f'waiting on history to load: {fqsn}')
await some_data_ready.wait() await some_data_ready.wait()
bus.feeds[symbol.lower()] = (init_msg, first_quotes) # append ``.<broker>`` suffix to each quote symbol
task_status.started((init_msg, first_quotes)) bsym = symbol + f'.{brokername}'
generic_first_quotes = {
bsym: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[fqsn] = (
init_msg,
generic_first_quotes,
)
# for ambiguous names we simply apply the retreived
# feed to that name (for now).
# task_status.started((init_msg, generic_first_quotes))
task_status.started()
# backend will indicate when real-time quotes have begun. # backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
@ -366,10 +376,11 @@ async def allocate_persistent_feed(
bus, bus,
shm, shm,
quote_stream, quote_stream,
brokername,
sum_tick_vlm sum_tick_vlm
) )
finally: finally:
log.warning(f'{symbol}@{brokername} feed task terminated') log.warning(f'{fqsn} feed task terminated')
@tractor.context @tractor.context
@ -402,25 +413,16 @@ async def open_feed_bus(
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
bus._subscribers.setdefault(symbol, [])
fqsn = mk_fqsn(brokername, symbol)
entry = bus.feeds.get(symbol)
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
entry = bus.feeds.get(symbol)
if entry is None: if entry is None:
if not start_stream: # allocate a new actor-local stream bus which
raise RuntimeError( # will persist for this `brokerd`.
f'No stream feed exists for {fqsn}?\n'
f'You may need a `brokerd` started first.'
)
# allocate a new actor-local stream bus which will persist for
# this `brokerd`.
async with bus.task_lock: async with bus.task_lock:
init_msg, first_quotes = await bus.nursery.start( await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
@ -442,9 +444,30 @@ async def open_feed_bus(
# subscriber # subscriber
init_msg, first_quotes = bus.feeds[symbol] init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol]
bfqsn = msg['fqsn'].lower()
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
assert bsym in first_quotes
# we use the broker-specific fqsn (bfqsn) for
# the sampler subscription since the backend isn't (yet)
# expected to append it's own name to the fqsn, so we filter
# on keys which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bfqsn, [])
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
# deliver initial info message a first quote asap # deliver initial info message a first quote asap
await ctx.started((init_msg, first_quotes)) await ctx.started((
init_msg,
first_quotes,
))
if not start_stream: if not start_stream:
log.warning(f'Not opening real-time stream for {fqsn}') log.warning(f'Not opening real-time stream for {fqsn}')
@ -454,12 +477,15 @@ async def open_feed_bus(
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# re-send to trigger display loop cycle (necessary especially
# when the mkt is closed and no real-time messages are
# expected).
await stream.send({fqsn: first_quotes})
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
if tick_throttle: if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
cs = await bus.start_task( cs = await bus.start_task(
uniform_rate_send, uniform_rate_send,
@ -472,12 +498,15 @@ async def open_feed_bus(
else: else:
sub = (stream, tick_throttle) sub = (stream, tick_throttle)
subs = bus._subscribers[symbol] subs = bus._subscribers[bfqsn]
subs.append(sub) subs.append(sub)
try: try:
uid = ctx.chan.uid uid = ctx.chan.uid
# ctrl protocol for start/stop of quote streams based on UI
# state (eg. don't need a stream when a symbol isn't being
# displayed).
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
@ -502,7 +531,7 @@ async def open_feed_bus(
# n.cancel_scope.cancel() # n.cancel_scope.cancel()
cs.cancel() cs.cancel()
try: try:
bus._subscribers[symbol].remove(sub) bus._subscribers[bfqsn].remove(sub)
except ValueError: except ValueError:
log.warning(f'{sub} for {symbol} was already removed?') log.warning(f'{sub} for {symbol} was already removed?')
@ -519,19 +548,20 @@ async def open_sample_step_stream(
# created for all practical purposes # created for all practical purposes
async with maybe_open_context( async with maybe_open_context(
acm_func=partial( acm_func=partial(
portal.open_stream_from, portal.open_context,
iter_ohlc_periods, iter_ohlc_periods,
), ),
kwargs={'delay_s': delay_s}, kwargs={'delay_s': delay_s},
) as (cache_hit, istream): ) as (cache_hit, (ctx, first)):
if cache_hit: async with ctx.open_stream() as istream:
# add a new broadcast subscription for the quote stream if cache_hit:
# if this feed is likely already in use # add a new broadcast subscription for the quote stream
async with istream.subscribe() as bistream: # if this feed is likely already in use
yield bistream async with istream.subscribe() as bistream:
else: yield bistream
yield istream else:
yield istream
@dataclass @dataclass
@ -627,10 +657,10 @@ async def install_brokerd_search(
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str,
symbols: list[str],
loglevel: Optional[str] = None,
fqsns: list[str],
loglevel: Optional[str] = None,
backpressure: bool = True, backpressure: bool = True,
start_stream: bool = True, start_stream: bool = True,
tick_throttle: Optional[float] = None, # Hz tick_throttle: Optional[float] = None, # Hz
@ -640,7 +670,10 @@ async def open_feed(
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
''' '''
sym = symbols[0].lower() fqsn = fqsns[0].lower()
brokername, key, suffix = unpack_fqsn(fqsn)
bfqsn = fqsn.replace('.' + brokername, '')
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
@ -661,7 +694,7 @@ async def open_feed(
portal.open_context( portal.open_context(
open_feed_bus, open_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=bfqsn,
loglevel=loglevel, loglevel=loglevel,
start_stream=start_stream, start_stream=start_stream,
tick_throttle=tick_throttle, tick_throttle=tick_throttle,
@ -678,9 +711,10 @@ async def open_feed(
): ):
# we can only read from shm # we can only read from shm
shm = attach_shm_array( shm = attach_shm_array(
token=init_msg[sym]['shm_token'], token=init_msg[bfqsn]['shm_token'],
readonly=True, readonly=True,
) )
assert fqsn in first_quotes
feed = Feed( feed = Feed(
name=brokername, name=brokername,
@ -693,17 +727,15 @@ async def open_feed(
) )
for sym, data in init_msg.items(): for sym, data in init_msg.items():
si = data['symbol_info'] si = data['symbol_info']
fqsn = data['fqsn'] + f'.{brokername}'
symbol = mk_symbol( symbol = Symbol.from_fqsn(
key=sym, fqsn,
type_key=si.get('asset_type', 'forex'), info=si,
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
) )
symbol.broker_info[brokername] = si
# symbol.broker_info[brokername] = si
feed.symbols[fqsn] = symbol
feed.symbols[sym] = symbol feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this # cast shm dtype to list... can't member why we need this
@ -727,26 +759,27 @@ async def open_feed(
@asynccontextmanager @asynccontextmanager
async def maybe_open_feed( async def maybe_open_feed(
brokername: str, fqsns: list[str],
symbols: list[str],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,
) -> (Feed, ReceiveChannel[dict[str, Any]]): ) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver. in a tractor broadcast receiver.
''' '''
sym = symbols[0].lower() fqsn = fqsns[0]
async with maybe_open_context( async with maybe_open_context(
acm_func=open_feed, acm_func=open_feed,
kwargs={ kwargs={
'brokername': brokername, 'fqsns': fqsns,
'symbols': [sym],
'loglevel': loglevel, 'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'), 'tick_throttle': kwargs.get('tick_throttle'),
@ -754,11 +787,12 @@ async def maybe_open_feed(
'backpressure': kwargs.get('backpressure', True), 'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=sym, key=fqsn,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
log.info(f'Using cached feed for {brokername}.{sym}') log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with feed.stream.subscribe() as bstream: async with feed.stream.subscribe() as bstream:

View File

@ -37,6 +37,7 @@ from .. import data
from ..data import attach_shm_array from ..data import attach_shm_array
from ..data.feed import Feed from ..data.feed import Feed
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data._source import Symbol
from ._api import ( from ._api import (
Fsp, Fsp,
_load_builtins, _load_builtins,
@ -76,7 +77,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context, ctx: tractor.Context,
symbol: str, symbol: Symbol,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -95,13 +96,14 @@ async def fsp_compute(
disabled=True disabled=True
) )
fqsn = symbol.front_fqsn()
out_stream = func( out_stream = func(
# TODO: do we even need this if we do the feed api right? # TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle # shouldn't a local stream do this before we get a handle
# to the async iterable? it's that or we do some kinda # to the async iterable? it's that or we do some kinda
# async itertools style? # async itertools style?
filter_quotes_by_sym(symbol, quote_stream), filter_quotes_by_sym(fqsn, quote_stream),
# XXX: currently the ``ohlcv`` arg # XXX: currently the ``ohlcv`` arg
feed.shm, feed.shm,
@ -235,8 +237,7 @@ async def cascade(
ctx: tractor.Context, ctx: tractor.Context,
# data feed key # data feed key
brokername: str, fqsn: str,
symbol: str,
src_shm_token: dict, src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype], dst_shm_token: tuple[str, np.dtype],
@ -289,8 +290,7 @@ async def cascade(
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
brokername, [fqsn],
[symbol],
# TODO throttle tick outputs from *this* daemon since # TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only # it'll emit tons of ticks due to the throttle only
@ -299,6 +299,7 @@ async def cascade(
# tick_throttle=60, # tick_throttle=60,
) as (feed, quote_stream): ) as (feed, quote_stream):
symbol = feed.symbols[fqsn]
profiler(f'{func}: feed up') profiler(f'{func}: feed up')

View File

@ -239,7 +239,7 @@ class GodWidget(QWidget):
symbol = linkedsplits.symbol symbol = linkedsplits.symbol
if symbol is not None: if symbol is not None:
self.window.setWindowTitle( self.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} ' f'{symbol.front_fqsn()} '
f'tick:{symbol.tick_size}' f'tick:{symbol.tick_size}'
) )

View File

@ -211,7 +211,6 @@ async def graphics_update_loop(
# async for quotes in iter_drain_quotes(): # async for quotes in iter_drain_quotes():
async for quotes in stream: async for quotes in stream:
quote_period = time.time() - last_quote quote_period = time.time() - last_quote
quote_rate = round( quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf') 1/quote_period, 1) if quote_period > 0 else float('inf')
@ -480,24 +479,23 @@ async def display_symbol_data(
# clear_on_next=True, # clear_on_next=True,
# group_key=loading_sym_key, # group_key=loading_sym_key,
# ) # )
async with open_feed( async with open_feed(
provider, ['.'.join((sym, provider))],
[sym], loglevel=loglevel,
loglevel=loglevel,
# limit to at least display's FPS # limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches # avoiding needless Qt-in-guest-mode context switches
tick_throttle=_quote_throttle_rate, tick_throttle=_quote_throttle_rate,
) as feed: ) as feed:
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
bars = ohlcv.array bars = ohlcv.array
symbol = feed.symbols[sym] symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn()
# load in symbol's ohlc data # load in symbol's ohlc data
godwidget.window.setWindowTitle( godwidget.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} ' f'{fqsn} '
f'tick:{symbol.tick_size} ' f'tick:{symbol.tick_size} '
f'step:1s ' f'step:1s '
) )
@ -582,8 +580,7 @@ async def display_symbol_data(
open_order_mode( open_order_mode(
feed, feed,
chart, chart,
symbol, fqsn,
provider,
order_mode_started order_mode_started
) )
): ):

View File

@ -386,6 +386,7 @@ class FspAdmin:
portal: tractor.Portal, portal: tractor.Portal,
complete: trio.Event, complete: trio.Event,
started: trio.Event, started: trio.Event,
fqsn: str,
dst_shm: ShmArray, dst_shm: ShmArray,
conf: dict, conf: dict,
target: Fsp, target: Fsp,
@ -397,7 +398,6 @@ class FspAdmin:
cluster and sleeps until signalled to exit. cluster and sleeps until signalled to exit.
''' '''
brokername, sym = self.linked.symbol.front_feed()
ns_path = str(target.ns_path) ns_path = str(target.ns_path)
async with ( async with (
portal.open_context( portal.open_context(
@ -406,8 +406,7 @@ class FspAdmin:
cascade, cascade,
# data feed key # data feed key
brokername=brokername, fqsn=fqsn,
symbol=sym,
# mems # mems
src_shm_token=self.src_shm.token, src_shm_token=self.src_shm.token,
@ -429,7 +428,7 @@ class FspAdmin:
): ):
# register output data # register output data
self._registry[ self._registry[
(brokername, sym, ns_path) (fqsn, ns_path)
] = ( ] = (
stream, stream,
dst_shm, dst_shm,
@ -452,11 +451,11 @@ class FspAdmin:
) -> (ShmArray, trio.Event): ) -> (ShmArray, trio.Event):
fqsn = self.linked.symbol.front_feed() fqsn = self.linked.symbol.front_fqsn()
# allocate an output shm array # allocate an output shm array
key, dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
'.'.join(fqsn), fqsn,
target=target, target=target,
readonly=True, readonly=True,
) )
@ -477,6 +476,7 @@ class FspAdmin:
portal, portal,
complete, complete,
started, started,
fqsn,
dst_shm, dst_shm,
conf, conf,
target, target,

View File

@ -268,13 +268,14 @@ class OrderMode:
''' '''
staged = self._staged_order staged = self._staged_order
symbol = staged.symbol symbol: Symbol = staged.symbol
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
fqsn = symbol.front_fqsn()
order = staged.copy( order = staged.copy(
update={ update={
'symbol': symbol.key, 'symbol': fqsn,
'oid': oid, 'oid': oid,
} }
) )
@ -519,8 +520,7 @@ async def open_order_mode(
feed: Feed, feed: Feed,
chart: 'ChartPlotWidget', # noqa chart: 'ChartPlotWidget', # noqa
symbol: Symbol, fqsn: str,
brokername: str,
started: trio.Event, started: trio.Event,
) -> None: ) -> None:
@ -546,8 +546,7 @@ async def open_order_mode(
# spawn EMS actor-service # spawn EMS actor-service
async with ( async with (
open_ems(fqsn) as (
open_ems(brokername, symbol) as (
book, book,
trades_stream, trades_stream,
position_msgs, position_msgs,
@ -556,8 +555,7 @@ async def open_order_mode(
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info(f'Opening order mode for {brokername}.{symbol.key}') log.info(f'Opening order mode for {fqsn}')
view = chart.view view = chart.view
# annotations editors # annotations editors
@ -566,7 +564,7 @@ async def open_order_mode(
# symbol id # symbol id
symbol = chart.linked.symbol symbol = chart.linked.symbol
symkey = symbol.key symkey = symbol.front_fqsn()
# map of per-provider account keys to position tracker instances # map of per-provider account keys to position tracker instances
trackers: dict[str, PositionTracker] = {} trackers: dict[str, PositionTracker] = {}
@ -610,7 +608,7 @@ async def open_order_mode(
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
startup_pp.update_from_msg(msg) startup_pp.update_from_msg(msg)
# allocator # allocator config
alloc = mk_allocator( alloc = mk_allocator(
symbol=symbol, symbol=symbol,
account=account_name, account=account_name,
@ -818,8 +816,18 @@ async def process_trades_and_update_ui(
'position', 'position',
): ):
sym = mode.chart.linked.symbol sym = mode.chart.linked.symbol
if msg['symbol'].lower() in sym.key: pp_msg_symbol = msg['symbol'].lower()
fqsn = sym.front_fqsn()
broker, key = sym.front_feed()
# print(
# f'pp msg symbol: {pp_msg_symbol}\n',
# f'fqsn: {fqsn}\n',
# f'front key: {key}\n',
# )
if (
pp_msg_symbol == fqsn.replace(f'.{broker}', '')
):
tracker = mode.trackers[msg['account']] tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg) tracker.live_pp.update_from_msg(msg)
# update order pane widgets # update order pane widgets