Much (much) better symbology cache refinements

For starters rename the cache type to `SymbologyCache` and fill out its
interface to include an (async) `.reload()` which can be used to populate
the in-mem asset-table sets such that any tractor-runtime task can
actually directly call it. Use a symcache file name schema of
`_cache/<backend>.symcache.toml`.

Dirtier deatz:
- make `.open_symcache()` a `@cm` such that it can be used from sync code
  and will actually call `trio.run()` in the case where it needs to do a
  full (re)load; also don't write on exit only on reloads.
- add `.get_symcache()` a simple non-ctx-mngr reader which again can
  mostly be called willy-nilly from sync code without the full runtime
  being up (but likely will only work if symcache files already exist
  for the backend).
account_tests
Tyler Goodlet 2023-07-06 15:19:08 -04:00
parent 005023275e
commit c8c28df62f
1 changed files with 125 additions and 39 deletions

View File

@ -24,9 +24,13 @@ offline usage.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import (
# asynccontextmanager as acm,
contextmanager as cm,
)
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from types import ModuleType
import tomli_w # for fast symbol cache writing import tomli_w # for fast symbol cache writing
try: try:
@ -41,21 +45,24 @@ from ..brokers import open_cached_client
from .types import Struct from .types import Struct
from ..accounting import ( from ..accounting import (
Asset, Asset,
# MktPair, MktPair,
) )
log = get_logger('data.cache') log = get_logger('data.cache')
class AssetsInfo(Struct): class SymbologyCache(Struct):
''' '''
Asset meta-data cache which holds lookup tables for 3 sets of Asset meta-data cache which holds lookup tables for 3 sets of
market-symbology related struct-types required by the market-symbology related struct-types required by the
`.accounting` and `.data` subsystems. `.accounting` and `.data` subsystems.
''' '''
provider: str mod: ModuleType
fp: Path fp: Path
# all asset-money-systems descriptions as minimally defined by
# in `.accounting.Asset`
assets: dict[str, Asset] = field(default_factory=dict) assets: dict[str, Asset] = field(default_factory=dict)
# backend-system pairs loaded in provider (schema) specific # backend-system pairs loaded in provider (schema) specific
@ -65,13 +72,23 @@ class AssetsInfo(Struct):
# TODO: piker-normalized `.accounting.MktPair` table? # TODO: piker-normalized `.accounting.MktPair` table?
# loaded from the `.pairs` and a normalizer # loaded from the `.pairs` and a normalizer
# provided by the backend pkg. # provided by the backend pkg.
# mkt_pairs: dict[str, MktPair] = field(default_factory=dict) mktmaps: dict[str, MktPair] = field(default_factory=dict)
def write_config(self) -> None: def write_config(self) -> None:
cachedict: dict[str, Any] = { cachedict: dict[str, Any] = {}
for key, attr in {
'assets': self.assets, 'assets': self.assets,
'pairs': self.pairs, 'pairs': self.pairs,
} # 'mktmaps': self.mktmaps,
}.items():
if not attr:
log.warning(
f'Asset cache table for `{key}` is empty?'
)
continue
cachedict[key] = attr
try: try:
with self.fp.open(mode='wb') as fp: with self.fp.open(mode='wb') as fp:
tomli_w.dump(cachedict, fp) tomli_w.dump(cachedict, fp)
@ -79,18 +96,58 @@ class AssetsInfo(Struct):
self.fp.unlink() self.fp.unlink()
raise raise
async def load(self) -> None:
async with open_cached_client(self.mod.name) as client:
_caches: dict[str, AssetsInfo] = {} if get_assets := getattr(client, 'get_assets', None):
assets: dict[str, Asset] = await get_assets()
for bs_mktid, asset in assets.items():
self.assets[bs_mktid] = asset.to_dict()
else:
log.warning(
'No symbology cache `Asset` support for `{provider}`..\n'
'Implement `Client.get_assets()`!'
)
if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None):
pairs: dict[str, Struct] = await get_mkt_pairs()
for bs_fqme, pair in pairs.items():
entry = await self.mod.get_mkt_info(pair.bs_fqme)
if not entry:
continue
mkt: MktPair
pair: Struct
mkt, _pair = entry
assert _pair is pair
self.pairs[pair.bs_fqme] = pair.to_dict()
self.mktmaps[mkt.fqme] = mkt
else:
log.warning(
'No symbology cache `Pair` support for `{provider}`..\n'
'Implement `Client.get_mkt_pairs()`!'
)
return self
@acm # actor-process-local in-mem-cache of symcaches (by backend).
async def open_symbology_cache( _caches: dict[str, SymbologyCache] = {}
provider: str,
@cm
def open_symcache(
mod: ModuleType,
reload: bool = False, reload: bool = False,
) -> AssetsInfo: ) -> SymbologyCache:
global _caches
provider: str = mod.name
# actor-level cache-cache XD
global _caches
if not reload: if not reload:
try: try:
yield _caches[provider] yield _caches[provider]
@ -103,10 +160,10 @@ async def open_symbology_cache(
log.info(f'Creating `nativedb` director: {cachedir}') log.info(f'Creating `nativedb` director: {cachedir}')
cachedir.mkdir() cachedir.mkdir()
cachefile: Path = cachedir / f'{str(provider)}_symbology.toml' cachefile: Path = cachedir / f'{str(provider)}.symcache.toml'
cache = AssetsInfo( cache = SymbologyCache(
provider=provider, mod=mod,
fp=cachefile, fp=cachefile,
) )
@ -117,42 +174,71 @@ async def open_symbology_cache(
reload reload
or not cachefile.is_file() or not cachefile.is_file()
): ):
async with open_cached_client(provider) as client: log.info(f'GENERATING symbology cache for `{mod.name}`')
if get_assets := getattr(client, 'get_assets', None): import tractor
assets: dict[str, Asset] = await get_assets() import trio
for bs_mktid, asset in assets.items():
cache.assets[bs_mktid] = asset.to_dict()
else:
log.warning(
'No symbology cache `Asset` support for `{provider}`..\n'
'Implement `Client.get_assets()`!'
)
if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None): # spawn tractor runtime and generate cache
for bs_mktid, pair in (await get_mkt_pairs()).items(): # if not existing.
cache.pairs[bs_mktid] = pair.to_dict() async def sched_gen_symcache():
else:
log.warning(
'No symbology cache `Pair` support for `{provider}`..\n'
'Implement `Client.get_mkt_pairs()`!'
)
# TODO: pack into `MktPair` normalized types as async with (
# well? # only for runtime
tractor.open_nursery(debug_mode=True),
):
return await cache.load()
cache: SymbologyCache = trio.run(sched_gen_symcache)
# only (re-)write if explicit reload or non-existing # only (re-)write if explicit reload or non-existing
cache.write_config() cache.write_config()
else: else:
log.info(
f'Loading EXISTING `{mod.name}` symbology cache:\n'
f'> {cache.fp}'
)
import time
now = time.time()
with cachefile.open('rb') as existing_fp: with cachefile.open('rb') as existing_fp:
data: dict[str, dict] = tomllib.load(existing_fp) data: dict[str, dict] = tomllib.load(existing_fp)
log.runtime(f'SYMCACHE TOML LOAD TIME: {time.time() - now}')
for key, table in data.items(): for key, table in data.items():
attr: dict[str, Any] = getattr(cache, key) attr: dict[str, Any] = getattr(cache, key)
if attr != table: assert not attr
log.info(f'OUT-OF-SYNC symbology cache: {key}') # if attr != table:
# log.info(f'OUT-OF-SYNC symbology cache: {key}')
setattr(cache, key, table) setattr(cache, key, table)
# TODO: use a real profiling sys..
# https://github.com/pikers/piker/issues/337
log.info(f'SYMCACHE LOAD TIME: {time.time() - now}')
yield cache yield cache
cache.write_config()
# TODO: write only when changes detected? but that should
# never happen right except on reload?
# cache.write_config()
def get_symcache(
provider: str,
force_reload: bool = False,
) -> SymbologyCache:
'''
Get any available symbology/assets cache from
sync code by manually running `trio` to do the work.
'''
from ..brokers import get_brokermod
with open_symcache(
get_brokermod(provider),
reload=force_reload,
) as symcache:
return symcache