Compare commits
8 Commits
9e6bfa0926
...
c3f3b25524
Author | SHA1 | Date |
---|---|---|
|
c3f3b25524 | |
|
8dcdf7c9a9 | |
|
4d15b9bfdd | |
|
5e371f1d73 | |
|
6c221bb348 | |
|
e391c896f8 | |
|
5633f5614d | |
|
76735189de |
|
@ -30,7 +30,8 @@ from types import ModuleType
|
|||
from typing import (
|
||||
Any,
|
||||
Iterator,
|
||||
Generator
|
||||
Generator,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import pendulum
|
||||
|
@ -59,8 +60,10 @@ from ..clearing._messages import (
|
|||
BrokerdPosition,
|
||||
)
|
||||
from piker.types import Struct
|
||||
from piker.data._symcache import SymbologyCache
|
||||
from ..log import get_logger
|
||||
from piker.log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from piker.data._symcache import SymbologyCache
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -493,6 +496,17 @@ class Account(Struct):
|
|||
|
||||
_mktmap_table: dict[str, MktPair] | None = None,
|
||||
|
||||
only_require: list[str]|True = True,
|
||||
# ^list of fqmes that are "required" to be processed from
|
||||
# this ledger pass; we often don't care about others and
|
||||
# definitely shouldn't always error in such cases.
|
||||
# (eg. broker backend loaded that doesn't yet supsport the
|
||||
# symcache but also, inside the paper engine we don't ad-hoc
|
||||
# request `get_mkt_info()` for every symbol in the ledger,
|
||||
# only the one for which we're simulating against).
|
||||
# TODO, not sure if there's a better soln for this, ideally
|
||||
# all backends get symcache support afap i guess..
|
||||
|
||||
) -> dict[str, Position]:
|
||||
'''
|
||||
Update the internal `.pps[str, Position]` table from input
|
||||
|
@ -535,11 +549,32 @@ class Account(Struct):
|
|||
if _mktmap_table is None:
|
||||
raise
|
||||
|
||||
required: bool = (
|
||||
only_require is True
|
||||
or (
|
||||
only_require is not True
|
||||
and
|
||||
fqme in only_require
|
||||
)
|
||||
)
|
||||
# XXX: caller is allowed to provide a fallback
|
||||
# mktmap table for the case where a new position is
|
||||
# being added and the preloaded symcache didn't
|
||||
# have this entry prior (eg. with frickin IB..)
|
||||
mkt = _mktmap_table[fqme]
|
||||
if (
|
||||
not (mkt := _mktmap_table.get(fqme))
|
||||
and
|
||||
required
|
||||
):
|
||||
raise
|
||||
|
||||
elif not required:
|
||||
continue
|
||||
|
||||
else:
|
||||
# should be an entry retreived somewhere
|
||||
assert mkt
|
||||
|
||||
|
||||
if not (pos := pps.get(bs_mktid)):
|
||||
|
||||
|
@ -656,7 +691,7 @@ class Account(Struct):
|
|||
def write_config(self) -> None:
|
||||
'''
|
||||
Write the current account state to the user's account TOML file, normally
|
||||
something like ``pps.toml``.
|
||||
something like `pps.toml`.
|
||||
|
||||
'''
|
||||
# TODO: show diff output?
|
||||
|
|
|
@ -653,6 +653,7 @@ async def open_trade_dialog(
|
|||
# in) use manually constructed table from calling
|
||||
# the `.get_mkt_info()` provider EP above.
|
||||
_mktmap_table=mkt_by_fqme,
|
||||
only_require=list(mkt_by_fqme),
|
||||
)
|
||||
|
||||
pp_msgs: list[BrokerdPosition] = []
|
||||
|
|
|
@ -31,6 +31,7 @@ from pathlib import Path
|
|||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Sequence,
|
||||
Hashable,
|
||||
TYPE_CHECKING,
|
||||
|
@ -56,7 +57,7 @@ from piker.brokers import (
|
|||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..accounting import (
|
||||
from piker.accounting import (
|
||||
Asset,
|
||||
MktPair,
|
||||
)
|
||||
|
@ -149,57 +150,68 @@ class SymbologyCache(Struct):
|
|||
'Implement `Client.get_assets()`!'
|
||||
)
|
||||
|
||||
if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None):
|
||||
|
||||
pairs: dict[str, Struct] = await get_mkt_pairs()
|
||||
for bs_fqme, pair in pairs.items():
|
||||
|
||||
# NOTE: every backend defined pair should
|
||||
# declare it's ns path for roundtrip
|
||||
# serialization lookup.
|
||||
if not getattr(pair, 'ns_path', None):
|
||||
raise TypeError(
|
||||
f'Pair-struct for {self.mod.name} MUST define a '
|
||||
'`.ns_path: str`!\n'
|
||||
f'{pair}'
|
||||
)
|
||||
|
||||
entry = await self.mod.get_mkt_info(pair.bs_fqme)
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
mkt: MktPair
|
||||
pair: Struct
|
||||
mkt, _pair = entry
|
||||
assert _pair is pair, (
|
||||
f'`{self.mod.name}` backend probably has a '
|
||||
'keying-symmetry problem between the pair-`Struct` '
|
||||
'returned from `Client.get_mkt_pairs()`and the '
|
||||
'module level endpoint: `.get_mkt_info()`\n\n'
|
||||
"Here's the struct diff:\n"
|
||||
f'{_pair - pair}'
|
||||
)
|
||||
# NOTE XXX: this means backends MUST implement
|
||||
# a `Struct.bs_mktid: str` field to provide
|
||||
# a native-keyed map to their own symbol
|
||||
# set(s).
|
||||
self.pairs[pair.bs_mktid] = pair
|
||||
|
||||
# NOTE: `MktPair`s are keyed here using piker's
|
||||
# internal FQME schema so that search,
|
||||
# accounting and feed init can be accomplished
|
||||
# a sane, uniform, normalized basis.
|
||||
self.mktmaps[mkt.fqme] = mkt
|
||||
|
||||
self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref(
|
||||
pair,
|
||||
)
|
||||
|
||||
else:
|
||||
get_mkt_pairs: Callable|None = getattr(
|
||||
client,
|
||||
'get_mkt_pairs',
|
||||
None,
|
||||
)
|
||||
if not get_mkt_pairs:
|
||||
log.warning(
|
||||
'No symbology cache `Pair` support for `{provider}`..\n'
|
||||
'Implement `Client.get_mkt_pairs()`!'
|
||||
)
|
||||
return self
|
||||
|
||||
pairs: dict[str, Struct] = await get_mkt_pairs()
|
||||
if not pairs:
|
||||
log.warning(
|
||||
'No pairs from intial {provider!r} sym-cache request?\n\n'
|
||||
'`Client.get_mkt_pairs()` -> {pairs!r} ?'
|
||||
)
|
||||
return self
|
||||
|
||||
for bs_fqme, pair in pairs.items():
|
||||
if not getattr(pair, 'ns_path', None):
|
||||
# XXX: every backend defined pair must declare
|
||||
# a `.ns_path: tractor.NamespacePath` to enable
|
||||
# roundtrip serialization lookup from a local
|
||||
# cache file.
|
||||
raise TypeError(
|
||||
f'Pair-struct for {self.mod.name} MUST define a '
|
||||
'`.ns_path: str`!\n\n'
|
||||
f'{pair!r}'
|
||||
)
|
||||
|
||||
entry = await self.mod.get_mkt_info(pair.bs_fqme)
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
mkt: MktPair
|
||||
pair: Struct
|
||||
mkt, _pair = entry
|
||||
assert _pair is pair, (
|
||||
f'`{self.mod.name}` backend probably has a '
|
||||
'keying-symmetry problem between the pair-`Struct` '
|
||||
'returned from `Client.get_mkt_pairs()`and the '
|
||||
'module level endpoint: `.get_mkt_info()`\n\n'
|
||||
"Here's the struct diff:\n"
|
||||
f'{_pair - pair}'
|
||||
)
|
||||
# NOTE XXX: this means backends MUST implement
|
||||
# a `Struct.bs_mktid: str` field to provide
|
||||
# a native-keyed map to their own symbol
|
||||
# set(s).
|
||||
self.pairs[pair.bs_mktid] = pair
|
||||
|
||||
# NOTE: `MktPair`s are keyed here using piker's
|
||||
# internal FQME schema so that search,
|
||||
# accounting and feed init can be accomplished
|
||||
# a sane, uniform, normalized basis.
|
||||
self.mktmaps[mkt.fqme] = mkt
|
||||
|
||||
self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref(
|
||||
pair,
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
|
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
|||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
log.exception(f'Retrying connection')
|
||||
log.exception('Retrying connection')
|
||||
|
||||
# ws & nursery block ends
|
||||
|
||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
|||
|
||||
|
||||
'''
|
||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
||||
over a NoBsWs.
|
||||
JSONRPC response-request style machinery for transparent multiplexing
|
||||
of msgs over a `NoBsWs`.
|
||||
|
||||
'''
|
||||
|
||||
|
@ -377,43 +377,82 @@ async def open_jsonrpc_session(
|
|||
url: str,
|
||||
start_id: int = 0,
|
||||
response_type: type = JSONRPCResult,
|
||||
request_type: Optional[type] = None,
|
||||
request_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None,
|
||||
msg_recv_timeout: float = float('inf'),
|
||||
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
||||
# and options mkts are generally "slow moving"..
|
||||
#
|
||||
# FURTHER if we break the underlying ws connection then since we
|
||||
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
|
||||
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
|
||||
# broken and never restored with wtv init sequence is required to
|
||||
# re-establish a working req-resp session.
|
||||
|
||||
) -> Callable[[str, dict], dict]:
|
||||
'''
|
||||
Init a json-RPC-over-websocket connection to the provided `url`.
|
||||
|
||||
A `json_rpc: Callable[[str, dict], dict` is delivered to the
|
||||
caller for sending requests and a bg-`trio.Task` handles
|
||||
processing of response msgs including error reporting/raising in
|
||||
the parent/caller task.
|
||||
|
||||
'''
|
||||
# NOTE, store all request msgs so we can raise errors on the
|
||||
# caller side!
|
||||
req_msgs: dict[int, dict] = {}
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(url) as ws
|
||||
trio.open_nursery() as tn,
|
||||
open_autorecon_ws(
|
||||
url=url,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
) as ws
|
||||
):
|
||||
rpc_id: Iterable = count(start_id)
|
||||
rpc_id: Iterable[int] = count(start_id)
|
||||
rpc_results: dict[int, dict] = {}
|
||||
|
||||
async def json_rpc(method: str, params: dict) -> dict:
|
||||
async def json_rpc(
|
||||
method: str,
|
||||
params: dict,
|
||||
) -> dict:
|
||||
'''
|
||||
perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
|
||||
req_id: int = next(rpc_id)
|
||||
msg = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': next(rpc_id),
|
||||
'id': req_id,
|
||||
'method': method,
|
||||
'params': params
|
||||
}
|
||||
_id = msg['id']
|
||||
|
||||
rpc_results[_id] = {
|
||||
result = rpc_results[_id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
'error': None,
|
||||
'event': trio.Event(), # signal caller resp arrived
|
||||
}
|
||||
req_msgs[_id] = msg
|
||||
|
||||
await ws.send_msg(msg)
|
||||
|
||||
# wait for reponse before unblocking requester code
|
||||
await rpc_results[_id]['event'].wait()
|
||||
|
||||
ret = rpc_results[_id]['result']
|
||||
if (maybe_result := result['result']):
|
||||
ret = maybe_result
|
||||
del rpc_results[_id]
|
||||
|
||||
del rpc_results[_id]
|
||||
else:
|
||||
err = result['error']
|
||||
raise Exception(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {msg}\n'
|
||||
f'resp: {err}\n'
|
||||
)
|
||||
|
||||
if ret.error is not None:
|
||||
raise Exception(json.dumps(ret.error, indent=4))
|
||||
|
@ -428,6 +467,7 @@ async def open_jsonrpc_session(
|
|||
the server side.
|
||||
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
async for msg in ws:
|
||||
match msg:
|
||||
case {
|
||||
|
@ -451,19 +491,28 @@ async def open_jsonrpc_session(
|
|||
'params': _,
|
||||
}:
|
||||
log.debug(f'Recieved\n{msg}')
|
||||
if request_hook:
|
||||
await request_hook(request_type(**msg))
|
||||
|
||||
case {
|
||||
'error': error
|
||||
}:
|
||||
log.warning(f'Recieved\n{error}')
|
||||
if error_hook:
|
||||
await error_hook(response_type(**msg))
|
||||
# retreive orig request msg, set error
|
||||
# response in original "result" msg,
|
||||
# THEN FINALLY set the event to signal caller
|
||||
# to raise the error in the parent task.
|
||||
req_id: int = error['id']
|
||||
req_msg: dict = req_msgs[req_id]
|
||||
result: dict = rpc_results[req_id]
|
||||
result['error'] = error
|
||||
result['event'].set()
|
||||
log.error(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {req_msg}\n'
|
||||
f'resp: {error}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||
|
||||
n.start_soon(recv_task)
|
||||
tn.start_soon(recv_task)
|
||||
yield json_rpc
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
|
@ -786,7 +786,6 @@ async def install_brokerd_search(
|
|||
|
||||
@acm
|
||||
async def maybe_open_feed(
|
||||
|
||||
fqmes: list[str],
|
||||
loglevel: str | None = None,
|
||||
|
||||
|
@ -840,13 +839,12 @@ async def maybe_open_feed(
|
|||
|
||||
@acm
|
||||
async def open_feed(
|
||||
|
||||
fqmes: list[str],
|
||||
|
||||
loglevel: str | None = None,
|
||||
loglevel: str|None = None,
|
||||
allow_overruns: bool = True,
|
||||
start_stream: bool = True,
|
||||
tick_throttle: float | None = None, # Hz
|
||||
tick_throttle: float|None = None, # Hz
|
||||
|
||||
allow_remote_ctl_ui: bool = False,
|
||||
|
||||
|
|
|
@ -36,10 +36,10 @@ from ._sharedmem import (
|
|||
ShmArray,
|
||||
_Token,
|
||||
)
|
||||
from piker.accounting import MktPair
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..accounting import MktPair
|
||||
from .feed import Feed
|
||||
from piker.data.feed import Feed
|
||||
|
||||
|
||||
class Flume(Struct):
|
||||
|
@ -82,7 +82,7 @@ class Flume(Struct):
|
|||
|
||||
# TODO: do we need this really if we can pull the `Portal` from
|
||||
# ``tractor``'s internals?
|
||||
feed: Feed | None = None
|
||||
feed: Feed|None = None
|
||||
|
||||
@property
|
||||
def rt_shm(self) -> ShmArray:
|
||||
|
|
|
@ -113,9 +113,9 @@ def validate_backend(
|
|||
)
|
||||
if ep is None:
|
||||
log.warning(
|
||||
f'Provider backend {mod.name} is missing '
|
||||
f'{daemon_name} support :(\n'
|
||||
f'The following endpoint is missing: {name}'
|
||||
f'Provider backend {mod.name!r} is missing '
|
||||
f'{daemon_name!r} support?\n'
|
||||
f'|_module endpoint-func missing: {name!r}\n'
|
||||
)
|
||||
|
||||
inits: list[
|
||||
|
|
Loading…
Reference in New Issue