Compare commits

...

4 Commits

Author SHA1 Message Date
Gud Boi 19fc966765 Actually do per-actor-caching in `open_symcache()`
Use `dict.get()` instead of `try/except KeyError` for the actor-level
`_caches` lookup; actually store the loaded `symcache` back into
`_caches[provider]` so subsequent opens retrieve any in-mem cache
instance.

Deats,
- swap `_caches[provider]` `KeyError` catch to
  `.get()` with `if symcache:` guard.
- assign result back to `_caches[provider]` before
  yielding so the cache is persistent across calls.
- rename local `cache` -> `symcache` throughout.
- add `loglevel` param and init `get_console_log()`
  at function scope.

Also,
- improve `log.info()` msgs with `{provider!r}`,
  load-latency, and cachefile path details.
- demote "no cache exists" from `.warning()` to
  `.info()`.
- track `from_msg` to distinguish "NEW request to
  provider" vs loaded-from-file in final log line.
- reformat `or` conditions to multiline style.
- move `import time` to module-level.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:03:26 -04:00
Gud Boi c23d034935 Merge pull request 'Swap `tractor.to_asyncio.open_channel_from()` yield-pair order' (#90) from to_asyncio_api_update into main
Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/90
Reviewed-by: guille <guillermo@telos.net>
2026-03-17 21:58:15 +00:00
Gud Boi 347a2d67f9 Pin to `tractor` upstream `main` branch 2026-03-17 16:51:15 -04:00
Gud Boi b22c59e7a0 Swap `open_channel_from()` yield-pair order
Match upstream `tractor` API change where
`open_channel_from()` now yields `(chan, first)`
instead of `(first, chan)` — i.e.
`tuple[LinkedTaskChannel, Any]`.

- `brokers/ib/api.py`
- `brokers/ib/broker.py`
- `brokers/ib/feed.py`
- `brokers/deribit/api.py` (2 sites)

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 15:06:30 -04:00
7 changed files with 61 additions and 32 deletions

View File

@ -586,7 +586,7 @@ async def open_price_feed(
fh,
instrument
)
) as (first, chan):
) as (chan, first):
yield chan
@ -653,7 +653,7 @@ async def open_order_feed(
fh,
instrument
)
) as (first, chan):
) as (chan, first):
yield chan

View File

@ -1529,7 +1529,7 @@ async def open_client_proxies() -> tuple[
# TODO: maybe this should be the default in tractor?
key=tractor.current_actor().uid,
) as (cache_hit, (clients, _)),
) as (cache_hit, (_, clients)),
AsyncExitStack() as stack
):
@ -1718,7 +1718,7 @@ async def open_client_proxy(
open_aio_client_method_relay,
client=client,
event_consumers=event_table,
) as (first, chan),
) as (chan, first),
trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,

View File

@ -514,8 +514,8 @@ async def open_trade_event_stream(
recv_trade_updates,
client=client,
) as (
_, # first pushed val
trade_event_stream,
_, # first pushed val
):
task_status.started(trade_event_stream)
# block forever to keep session trio-asyncio session

View File

@ -989,7 +989,7 @@ async def open_aio_quote_stream(
symbol=symbol,
contract=contract,
) as (contract, from_aio):
) as (from_aio, contract):
assert contract

View File

@ -29,6 +29,7 @@ from contextlib import (
)
from pathlib import Path
from pprint import pformat
import time
from typing import (
Any,
Callable,
@ -48,7 +49,10 @@ except ModuleNotFoundError:
import tomli as tomllib
from msgspec import field
from piker.log import get_logger
from piker.log import (
get_console_log,
get_logger,
)
from piker import config
from piker.types import Struct
from piker.brokers import (
@ -356,24 +360,35 @@ def mk_cachefile(
) -> Path:
cachedir: Path = config.get_conf_dir() / '_cache'
if not cachedir.is_dir():
log.info(f'Creating `nativedb` director: {cachedir}')
log.info(
f'Creating symbology-cache subdir\n'
f'{cachedir}'
)
cachedir.mkdir()
cachefile: Path = cachedir / f'{str(provider)}.symcache.toml'
cachefile.touch()
return cachefile
@acm
async def open_symcache(
mod_or_name: ModuleType | str,
mod_or_name: ModuleType|str,
reload: bool = False,
only_from_memcache: bool = False, # no API req
_no_symcache: bool = False, # no backend support
loglevel: str = 'info',
) -> SymbologyCache:
log = get_console_log(
level=loglevel,
name=__name__,
)
if isinstance(mod_or_name, str):
mod = get_brokermod(mod_or_name)
else:
@ -388,7 +403,8 @@ async def open_symcache(
# the backend pkg-module is annotated appropriately.
if (
getattr(mod, '_no_symcache', False)
or _no_symcache
or
_no_symcache
):
yield SymbologyCache(
mod=mod,
@ -400,60 +416,75 @@ async def open_symcache(
# actor-level cache-cache XD
global _caches
if not reload:
try:
yield _caches[provider]
except KeyError:
symcache: SymbologyCache|None = _caches.get(provider)
if symcache:
yield symcache
else:
msg: str = (
f'No asset info cache exists yet for `{provider}`'
f'No in-mem symcache found for {provider!r}\n'
f'Loading..'
)
if only_from_memcache:
raise RuntimeError(msg)
else:
log.warning(msg)
log.info(msg)
# if no cache exists or an explicit reload is requested, load
# the provider API and call appropriate endpoints to populate
# the mkt and asset tables.
if (
reload
or not cachefile.is_file()
or
not cachefile.is_file()
):
cache = await SymbologyCache.from_scratch(
log.info(
f'Generating NEW symbology-cache for {provider!r}..\n'
f'>[{cachefile}'
)
symcache: SymbologyCache = await SymbologyCache.from_scratch(
mod=mod,
fp=cachefile,
)
else:
log.info(
f'Loading EXISTING `{mod.name}` symbology cache:\n'
f'> {cachefile}'
f'Loading EXISTING symbology-cache for {provider!r}..\n'
f'[>{cachefile}'
)
import time
now = time.time()
now: float = time.time()
with cachefile.open('rb') as existing_fp:
data: dict[str, dict] = tomllib.load(existing_fp)
log.runtime(f'SYMCACHE TOML LOAD TIME: {time.time() - now}')
log.runtime(
f'Symcache loaded!\n'
f'load-latency: {time.time() - now}\n'
)
# if there's an empty file for some reason we need
# to do a full reload as well!
if not data:
cache = await SymbologyCache.from_scratch(
from_msg: str = 'NEW request to provider'
symcache = await SymbologyCache.from_scratch(
mod=mod,
fp=cachefile,
)
else:
cache = SymbologyCache.from_dict(
from_msg: str = f'{cachefile}'
symcache = SymbologyCache.from_dict(
data,
mod=mod,
fp=cachefile,
)
# TODO: use a real profiling sys..
# https://github.com/pikers/piker/issues/337
log.info(f'SYMCACHE LOAD TIME: {time.time() - now}')
log.info(
f'Symcache data loaded from {from_msg} !\n'
f'load-latency: {time.time() - now}s\n'
f'cachefile: {cachefile}\n'
)
yield cache
_caches[provider] = symcache
yield symcache
# TODO: write only when changes detected? but that should
# never happen right except on reload?
@ -476,7 +507,6 @@ def get_symcache(
async with (
# only for runtime's debug mode
tractor.open_nursery(debug_mode=True),
open_symcache(
get_brokermod(provider),
reload=force_reload,

View File

@ -203,9 +203,8 @@ pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# xonsh = { git = 'https://github.com/xonsh/xonsh.git', branch = 'main' }
# XXX since, we're like, always hacking new shite all-the-time. Bp
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="main" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "main" }
# ------ goodboy ------
# hackin dev-envs, usually there's something new he's hackin in..
# tractor = { path = "../tractor", editable = true }

View File

@ -1034,7 +1034,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=piker_pin" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=main" },
{ name = "trio", specifier = ">=0.27" },
{ name = "trio-typing", specifier = ">=0.10.0" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
@ -1676,7 +1676,7 @@ wheels = [
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#566a17ba92469000a01fd18c709ea3e336e72796" }
source = { git = "https://github.com/goodboy/tractor.git?branch=main#e77198bb64f0467a50e251ed140daee439752354" }
dependencies = [
{ name = "bidict" },
{ name = "cffi" },