Compare commits
8 Commits
68940e8a73
...
2a9fdec253
Author | SHA1 | Date |
---|---|---|
|
2a9fdec253 | |
|
8f4697bc19 | |
|
32fe5c4942 | |
|
6e9759a7f1 | |
|
07b13a99b9 | |
|
2377fec665 | |
|
4de11e717c | |
|
525b7df3ee |
|
@ -30,7 +30,8 @@ from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Iterator,
|
Iterator,
|
||||||
Generator
|
Generator,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import pendulum
|
import pendulum
|
||||||
|
@ -59,8 +60,10 @@ from ..clearing._messages import (
|
||||||
BrokerdPosition,
|
BrokerdPosition,
|
||||||
)
|
)
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from piker.data._symcache import SymbologyCache
|
from piker.log import get_logger
|
||||||
from ..log import get_logger
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from piker.data._symcache import SymbologyCache
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -493,6 +496,17 @@ class Account(Struct):
|
||||||
|
|
||||||
_mktmap_table: dict[str, MktPair] | None = None,
|
_mktmap_table: dict[str, MktPair] | None = None,
|
||||||
|
|
||||||
|
only_require: list[str]|True = True,
|
||||||
|
# ^list of fqmes that are "required" to be processed from
|
||||||
|
# this ledger pass; we often don't care about others and
|
||||||
|
# definitely shouldn't always error in such cases.
|
||||||
|
# (eg. broker backend loaded that doesn't yet supsport the
|
||||||
|
# symcache but also, inside the paper engine we don't ad-hoc
|
||||||
|
# request `get_mkt_info()` for every symbol in the ledger,
|
||||||
|
# only the one for which we're simulating against).
|
||||||
|
# TODO, not sure if there's a better soln for this, ideally
|
||||||
|
# all backends get symcache support afap i guess..
|
||||||
|
|
||||||
) -> dict[str, Position]:
|
) -> dict[str, Position]:
|
||||||
'''
|
'''
|
||||||
Update the internal `.pps[str, Position]` table from input
|
Update the internal `.pps[str, Position]` table from input
|
||||||
|
@ -535,11 +549,32 @@ class Account(Struct):
|
||||||
if _mktmap_table is None:
|
if _mktmap_table is None:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
required: bool = (
|
||||||
|
only_require is True
|
||||||
|
or (
|
||||||
|
only_require is not True
|
||||||
|
and
|
||||||
|
fqme in only_require
|
||||||
|
)
|
||||||
|
)
|
||||||
# XXX: caller is allowed to provide a fallback
|
# XXX: caller is allowed to provide a fallback
|
||||||
# mktmap table for the case where a new position is
|
# mktmap table for the case where a new position is
|
||||||
# being added and the preloaded symcache didn't
|
# being added and the preloaded symcache didn't
|
||||||
# have this entry prior (eg. with frickin IB..)
|
# have this entry prior (eg. with frickin IB..)
|
||||||
mkt = _mktmap_table[fqme]
|
if (
|
||||||
|
not (mkt := _mktmap_table.get(fqme))
|
||||||
|
and
|
||||||
|
required
|
||||||
|
):
|
||||||
|
raise
|
||||||
|
|
||||||
|
elif not required:
|
||||||
|
continue
|
||||||
|
|
||||||
|
else:
|
||||||
|
# should be an entry retreived somewhere
|
||||||
|
assert mkt
|
||||||
|
|
||||||
|
|
||||||
if not (pos := pps.get(bs_mktid)):
|
if not (pos := pps.get(bs_mktid)):
|
||||||
|
|
||||||
|
@ -656,7 +691,7 @@ class Account(Struct):
|
||||||
def write_config(self) -> None:
|
def write_config(self) -> None:
|
||||||
'''
|
'''
|
||||||
Write the current account state to the user's account TOML file, normally
|
Write the current account state to the user's account TOML file, normally
|
||||||
something like ``pps.toml``.
|
something like `pps.toml`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: show diff output?
|
# TODO: show diff output?
|
||||||
|
|
|
@ -653,7 +653,11 @@ class Router(Struct):
|
||||||
flume = feed.flumes[fqme]
|
flume = feed.flumes[fqme]
|
||||||
first_quote: dict = flume.first_quote
|
first_quote: dict = flume.first_quote
|
||||||
book: DarkBook = self.get_dark_book(broker)
|
book: DarkBook = self.get_dark_book(broker)
|
||||||
book.lasts[fqme]: float = float(first_quote['last'])
|
|
||||||
|
if not (last := first_quote.get('last')):
|
||||||
|
last: float = flume.rt_shm.array[-1]['close']
|
||||||
|
|
||||||
|
book.lasts[fqme]: float = float(last)
|
||||||
|
|
||||||
async with self.maybe_open_brokerd_dialog(
|
async with self.maybe_open_brokerd_dialog(
|
||||||
brokermod=brokermod,
|
brokermod=brokermod,
|
||||||
|
@ -716,7 +720,7 @@ class Router(Struct):
|
||||||
subs = self.subscribers[sub_key]
|
subs = self.subscribers[sub_key]
|
||||||
|
|
||||||
sent_some: bool = False
|
sent_some: bool = False
|
||||||
for client_stream in subs:
|
for client_stream in subs.copy():
|
||||||
try:
|
try:
|
||||||
await client_stream.send(msg)
|
await client_stream.send(msg)
|
||||||
sent_some = True
|
sent_some = True
|
||||||
|
@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events(
|
||||||
status_msg.brokerd_msg = msg
|
status_msg.brokerd_msg = msg
|
||||||
status_msg.src = msg.broker_details['name']
|
status_msg.src = msg.broker_details['name']
|
||||||
|
|
||||||
await router.client_broadcast(
|
if not status_msg.req:
|
||||||
status_msg.req.symbol,
|
# likely some order change state?
|
||||||
status_msg,
|
await tractor.pause()
|
||||||
)
|
else:
|
||||||
|
await router.client_broadcast(
|
||||||
|
status_msg.req.symbol,
|
||||||
|
status_msg,
|
||||||
|
)
|
||||||
|
|
||||||
if status == 'closed':
|
if status == 'closed':
|
||||||
log.info(f'Execution for {oid} is complete!')
|
log.info(f'Execution for {oid} is complete!')
|
||||||
|
|
|
@ -653,6 +653,7 @@ async def open_trade_dialog(
|
||||||
# in) use manually constructed table from calling
|
# in) use manually constructed table from calling
|
||||||
# the `.get_mkt_info()` provider EP above.
|
# the `.get_mkt_info()` provider EP above.
|
||||||
_mktmap_table=mkt_by_fqme,
|
_mktmap_table=mkt_by_fqme,
|
||||||
|
only_require=list(mkt_by_fqme),
|
||||||
)
|
)
|
||||||
|
|
||||||
pp_msgs: list[BrokerdPosition] = []
|
pp_msgs: list[BrokerdPosition] = []
|
||||||
|
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
||||||
nobsws._connected.set()
|
nobsws._connected.set()
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
except HandshakeError:
|
except HandshakeError:
|
||||||
log.exception(f'Retrying connection')
|
log.exception('Retrying connection')
|
||||||
|
|
||||||
# ws & nursery block ends
|
# ws & nursery block ends
|
||||||
|
|
||||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
JSONRPC response-request style machinery for transparent multiplexing
|
||||||
over a NoBsWs.
|
of msgs over a NoBsWs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
|
||||||
url: str,
|
url: str,
|
||||||
start_id: int = 0,
|
start_id: int = 0,
|
||||||
response_type: type = JSONRPCResult,
|
response_type: type = JSONRPCResult,
|
||||||
request_type: Optional[type] = None,
|
# request_type: Optional[type] = None,
|
||||||
request_hook: Optional[Callable] = None,
|
# request_hook: Optional[Callable] = None,
|
||||||
error_hook: Optional[Callable] = None,
|
# error_hook: Optional[Callable] = None,
|
||||||
) -> Callable[[str, dict], dict]:
|
) -> Callable[[str, dict], dict]:
|
||||||
|
|
||||||
|
# NOTE, store all request msgs so we can raise errors on the
|
||||||
|
# caller side!
|
||||||
|
req_msgs: dict[int, dict] = {}
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
open_autorecon_ws(url) as ws
|
open_autorecon_ws(url) as ws
|
||||||
):
|
):
|
||||||
rpc_id: Iterable = count(start_id)
|
rpc_id: Iterable[int] = count(start_id)
|
||||||
rpc_results: dict[int, dict] = {}
|
rpc_results: dict[int, dict] = {}
|
||||||
|
|
||||||
async def json_rpc(method: str, params: dict) -> dict:
|
async def json_rpc(method: str, params: dict) -> dict:
|
||||||
|
@ -394,26 +398,40 @@ async def open_jsonrpc_session(
|
||||||
perform a json rpc call and wait for the result, raise exception in
|
perform a json rpc call and wait for the result, raise exception in
|
||||||
case of error field present on response
|
case of error field present on response
|
||||||
'''
|
'''
|
||||||
|
nonlocal req_msgs
|
||||||
|
|
||||||
|
req_id: int = next(rpc_id)
|
||||||
msg = {
|
msg = {
|
||||||
'jsonrpc': '2.0',
|
'jsonrpc': '2.0',
|
||||||
'id': next(rpc_id),
|
'id': req_id,
|
||||||
'method': method,
|
'method': method,
|
||||||
'params': params
|
'params': params
|
||||||
}
|
}
|
||||||
_id = msg['id']
|
_id = msg['id']
|
||||||
|
|
||||||
rpc_results[_id] = {
|
result = rpc_results[_id] = {
|
||||||
'result': None,
|
'result': None,
|
||||||
'event': trio.Event()
|
'error': None,
|
||||||
|
'event': trio.Event(), # signal caller resp arrived
|
||||||
}
|
}
|
||||||
|
req_msgs[_id] = msg
|
||||||
|
|
||||||
await ws.send_msg(msg)
|
await ws.send_msg(msg)
|
||||||
|
|
||||||
|
# wait for reponse before unblocking requester code
|
||||||
await rpc_results[_id]['event'].wait()
|
await rpc_results[_id]['event'].wait()
|
||||||
|
|
||||||
ret = rpc_results[_id]['result']
|
if (maybe_result := result['result']):
|
||||||
|
ret = maybe_result
|
||||||
|
del rpc_results[_id]
|
||||||
|
|
||||||
del rpc_results[_id]
|
else:
|
||||||
|
err = result['error']
|
||||||
|
raise Exception(
|
||||||
|
f'JSONRPC request failed\n'
|
||||||
|
f'req: {msg}\n'
|
||||||
|
f'resp: {err}\n'
|
||||||
|
)
|
||||||
|
|
||||||
if ret.error is not None:
|
if ret.error is not None:
|
||||||
raise Exception(json.dumps(ret.error, indent=4))
|
raise Exception(json.dumps(ret.error, indent=4))
|
||||||
|
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
|
||||||
the server side.
|
the server side.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
nonlocal req_msgs
|
||||||
async for msg in ws:
|
async for msg in ws:
|
||||||
match msg:
|
match msg:
|
||||||
case {
|
case {
|
||||||
|
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
|
||||||
'params': _,
|
'params': _,
|
||||||
}:
|
}:
|
||||||
log.debug(f'Recieved\n{msg}')
|
log.debug(f'Recieved\n{msg}')
|
||||||
if request_hook:
|
# if request_hook:
|
||||||
await request_hook(request_type(**msg))
|
# await request_hook(request_type(**msg))
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'error': error
|
'error': error
|
||||||
}:
|
}:
|
||||||
log.warning(f'Recieved\n{error}')
|
# if error_hook:
|
||||||
if error_hook:
|
# await error_hook(response_type(**msg))
|
||||||
await error_hook(response_type(**msg))
|
|
||||||
|
# retreive orig request msg, set error
|
||||||
|
# response in original "result" msg,
|
||||||
|
# THEN FINALLY set the event to signal caller
|
||||||
|
# to raise the error in the parent task.
|
||||||
|
req_id: int = error['id']
|
||||||
|
req_msg: dict = req_msgs[req_id]
|
||||||
|
result: dict = rpc_results[req_id]
|
||||||
|
result['error'] = error
|
||||||
|
result['event'].set()
|
||||||
|
log.error(
|
||||||
|
f'JSONRPC request failed\n'
|
||||||
|
f'req: {req_msg}\n'
|
||||||
|
f'resp: {error}\n'
|
||||||
|
)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||||
|
|
|
@ -458,13 +458,15 @@ async def start_backfill(
|
||||||
'bf_until <- last_start_dt:\n'
|
'bf_until <- last_start_dt:\n'
|
||||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||||
)
|
)
|
||||||
|
# UGH: what's a better way?
|
||||||
# ugh, what's a better way?
|
# TODO: backends are responsible for being correct on
|
||||||
# TODO: fwiw, we probably want a way to signal a throttle
|
# this right!?
|
||||||
# condition (eg. with ib) so that we can halt the
|
# -[ ] in the `ib` case we could maybe offer some way
|
||||||
# request loop until the condition is resolved?
|
# to halt the request loop until the condition is
|
||||||
if timeframe > 1:
|
# resolved or should the backend be entirely in
|
||||||
await tractor.pause()
|
# charge of solving such faults? yes, right?
|
||||||
|
# if timeframe > 1:
|
||||||
|
# await tractor.pause()
|
||||||
return
|
return
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
|
@ -578,6 +580,7 @@ async def start_backfill(
|
||||||
'crypto',
|
'crypto',
|
||||||
'crypto_currency',
|
'crypto_currency',
|
||||||
'fiat', # a "forex pair"
|
'fiat', # a "forex pair"
|
||||||
|
'perpetual_future', # stupid "perps" from cex land
|
||||||
}:
|
}:
|
||||||
# for now, our table key schema is not including
|
# for now, our table key schema is not including
|
||||||
# the dst[/src] source asset token.
|
# the dst[/src] source asset token.
|
||||||
|
|
Loading…
Reference in New Issue