Compare commits

..

75 Commits

Author SHA1 Message Date
Tyler Goodlet 8dee35cfa7 Add `.cancel()` log around `_async_main()` KBI-exit 2026-01-01 17:37:50 -05:00
Tyler Goodlet f628907bdc ib: bump `docker/ib/README.rst`
For the new github image, a high-level look at its basic
features/usage/docs and prosing around our expected default usage with
the `piker.brokers.ib` backend.
2026-01-01 17:37:36 -05:00
Tyler Goodlet 3df1308b77 ib.feed: better no-bars error-log message format 2026-01-01 17:37:36 -05:00
Tyler Goodlet c6afe7125e binance: set `Pair.pegInstructionsAllowed = False`
Lol, a cheeky unforeseen bug due to TOML's lack of a null type and
thinking i can render an `Optional` field on a `msgspec.Struct`
(defaulted to `None`) the `binance.symcache.toml` cache file..

I didn't catch this when i first updated to the 3.1 API in f7caa75228
because i never did a cache-files flush.. lesson learned and we **really
need tests for this**!!
2026-01-01 17:37:36 -05:00
Tyler Goodlet d6560ecc81 Wow, update root `conf.toml` to new multiaddr style
I don't know how this wasn't already committed but.. drops the legacy
`marketstore` tsdb socket info vars since we're going all in on
`nativedb` BP
2026-01-01 17:37:36 -05:00
Tyler Goodlet 9bfdcac72f `accouning.calc`: enable crash handlers on `debug_mode` input (via test harness) 2026-01-01 17:37:36 -05:00
Tyler Goodlet 82077df8bb Draft a gt-one-`.fqme`-in-txns/account-file test
To start this is just a shell for the test, there's no checking logic
yet.. put it as `test_accounting.test_ib_account_with_duplicated_mktids()`.
The test is composed for now to be completely runtime-free using only
the offline txn-ledger / symcache / account loading APIs, ideally we
fill in the activated symbology-data-runtime cases once we figure a sane
way to handle incremental symcache updates for backends like IB..

To actually fill the test out with real checks we still need to,
- extract the problem account file from my ib.algopape into the test
  harness data.
- pick some contracts with multiple fqmes despite a single bs_mktid and
  ensure they're aggregated as a single `Position` as well as,
  * ideally de-duplicating txns from the account file section for the
    mkt..
  * warning appropriately about greater-then-one fqme for the bs_mktid
    and providing a way for the ledger re-writing to choose the
    appropriate `<venue>` as the "primary" when the
    data-symbology-runtime is up and possibly use it to incrementally
    update the IB symcache and store offline for next use?
2026-01-01 17:37:36 -05:00
Tyler Goodlet af3b54108a `data._symcache`, impl a summary `.__repr__()`, avoids `Asset` causality issues 2026-01-01 17:37:36 -05:00
Tyler Goodlet d732788c61 Use `pytest` plugin now exposed by `tractor` 2026-01-01 17:37:36 -05:00
Tyler Goodlet 7549016085 Avoid `msgspec` eval-err on `Asset` in symcache? 2026-01-01 17:37:36 -05:00
Tyler Goodlet b526a248e2 Drop `open_pps()` from ems tests 2026-01-01 17:37:36 -05:00
Tyler Goodlet 997f8efd01 `ui._remote_ctl`: shield remote rect removals
Since under `trio`-cancellation the `.remove()` is a checkpoint and will
be masked by a taskc AND we **always want to remove the rect** despite
the surrounding teardown conditions.
2026-01-01 17:37:36 -05:00
Tyler Goodlet d8023377b5 `_ems`: tolerate and warn on already popped execs
In the `translate_and_relay_brokerd_events()` loop task that is, such
that we never crash on a `status_msg = book._active.pop(oid)` in the
'closed' status handler whenever a double removal happens.

Turns out there were unforeseen races here when a benign backend error
would cause an order-mode dialog to be cancelled (incorrectly) and then
a UI side `.on_cancel()` would trigger too-early removal from the
`book._active` table despite the backend sending an actual 'closed'
event (much) later, this would crash on the now missing entry..

So instead we now,
- obviously use `book._active.pop(oid, None)`
- emit a `log.warning()` (not info lol) on a null-read and with a less
  "one-line-y" message explaining the double removal and maybe *why*.
2026-01-01 17:37:36 -05:00
Tyler Goodlet d1fd8065ea `polars.cumsum()` is now `.cum_sum()` 2026-01-01 17:37:36 -05:00
Tyler Goodlet 583353acb9 ui.order_mode: prioritize mkt-match on `.bs_mktid`
For backends which opt to set the new `BrokerdPosition.bs_mktid` field,
give (matching logic) priority to it such that even if the `.symbol`
field doesn't match the mkt currently focussed on chart, it will
always match on a provider's own internal asset-mapping-id. The original
fallback logic for `.fqme` matching is left as is.

As an example with IB, a qqq.nasdaq.ib txn may have been filled on
a non-primary venue as qqq.directedea.ib, in this case if the mkt is
displayed and focused on chart we want the **entire position info** to
be overlayed by the `OrderMode` UX without discrepancy.

Other refinements,
- improve logging and add a detailed edge-case-comment around the
  `.on_fill()` handler to clarify where if a benign 'error' msg is
  relayed from a backend it will cause the UI to operate as though the
  order **was not-cleared/cancelled** since the `.on_cancel()` handler
  will have likely been called just before, popping the `.dialogs`
  entry. Return `bool` to indicate whether the UI removed-lines
  / added-fill-arrows.
- inverse the `return` branching logic in `.on_cancel()` to reduce
  indent.
- add a very loud `log.error()` in `Status(resp='error')` case-block
  ensuring the console yells about the order being cancelled, also
  a todo for the weird msg-field recursion nonsense..
2026-01-01 17:37:36 -05:00
Tyler Goodlet 7ac0de4efc Set `.bs_mktid` on all IB position-msg emissions.. 2026-01-01 17:37:36 -05:00
Tyler Goodlet c0ea7abcfb Add an option `BrokerdPosition.bs_mktid` field
Such that backends can deliver their own internal unique
`MktPair.bs_mktid` when they can't seem to get it right via the
`.fqme: str` export.. (COUGH ib, you piece of sh#$).

Also add todo for possibly replacing the msg with a `Position.summary()`
"snapshot" as a better and more rigorously generated wire-ready msg.
2026-01-01 17:37:36 -05:00
Tyler Goodlet 860fc28bbc Don't override `Account.pps: dict` entries..
Despite a `.bs_mktid` ideally being a bijection with `MktPair.fqme`
values, apparently some backends (cough IB) will switch the .<venue>`
part in txn records resulting in multiple account-conf-file sections for
the same dst asset. Obviously that means we can't allocate new
`Position` entries keyed by that `bs_mktid`, instead be sure to **update
them instead**!

Deats,
- add case logic to avoid pp overwrites using a `pp_objs.get()` check.
- warn on duplicated pos entries whenever the current account-file
  entry's `mkt` doesn't match the pre-existing position's.
- mk `Position.add_clear()` return a `bool` indicating if the record was
  newly added, warn when it was already existing/added prior.

Also,
- drop the already deprecated `open_pps()`, also from sub-pkg exports.
- draft TODO for `Position.summary()` idea as a replacement for
  `BrokerdPosition`-msgs.
2026-01-01 17:37:36 -05:00
Tyler Goodlet 4d81ba307e Bump lock file after vnc client change 2026-01-01 17:26:46 -05:00
Tyler Goodlet ce3c11e19a Switch to `pyvnc` for IB reset hackz
It actually works for vncAuth(2) (thank god!) which the previous
`asyncvnc` **did not**, and seems to be mostly based on the work
from the `asyncvnc` author anyway (so all my past efforts don't seem to
have been in vain XD).

Deats,
- switch to `pyvnc` async API (using `asyncio` again obvi) in
  `.ib._util._vnc_click_hack()`.
- add `pyvnc` as src installed dep from GH.
- drop `asyncvnc` as dep.

Other,
- update `pytest` version range to avoid weird auto-load plugin exposed
  by `xonsh`?
- add a `tool.pytest.ini_options` to project file with vars to,
  - disable that^ `xonsh` plug using `addopts = '-p no:xonsh'`.
  - set a `testpaths` to avoid running anything but that subdir.
  - try out the `'progress'` style console output (does it work?).
2026-01-01 17:26:46 -05:00
Tyler Goodlet a7e29a8573 Convert remaining `.to_asyncio.open_channel_from()` to `chan` fn-sig usage 2026-01-01 17:26:46 -05:00
Tyler Goodlet 9469a7c53c Flip screen-info script to qt6, refine it to heck.
Buncha updates and improvements,
- adjust sub-namespace imports according to console warnings.
- iterate all detected screens in a loop and instead report which is the
  primary and the current.
- type annotate all vars where non-obvious, particularly the`Qt` refs.
2026-01-01 17:26:46 -05:00
Tyler Goodlet b19b55f40a Use gitea for `tractor` repo endpoint 2026-01-01 17:26:44 -05:00
Tyler Goodlet 70a9be5761 `ib.feed`: finally solve `push()` exc propagation
Such that if/when the `push()` ticker callback (closure) errors
internally, we actually eventually bubble the error out-and-up from the
`asyncio.Task` and from there out the `.to_asyncio.open_channel_from()` to
the parent `trio.Task`..

It ended up being much more subtle to solve then i would have liked
thanks to,

- whatever `Ticker.updateEvent.connect()` does behind the scenes in
  terms of (clearly) swallowing with only log reporting any exc raised
  in the registered callback (in our case `push()`),

- `asyncio.Task.set_excepion()` never working and instead needing to
  resort to `Task.cancel()`, catching `CancelledError` and re-raising
  the stashed `maybe_exc` from `push()` when set..

Further this ports `.to_asyncio.open_channel_from()` usage to use
the new `chan: tractor.to_asyncio.LinkedTaskChannel` fn-sig API, namely
for `_setup_quote_stream()` task. Requires the latest `tractor` updates
to the inter-eventloop-chan iface providing a `.set_nowait()` and
`.get()` for the `asyncio`-side.

Impl deats within `_setup_quote_stream()`,
- implement `push()` error-bubbling by adding a `maybe_exc` which can be
  set by that callback itself or by its registering task; when set it is
  both,
  * reported on by the `teardown()` cb,
  * re-raised by the terminated (via `.cancel()`) `asyncio.Task` after
    woken from its sleep, aka "cancelled" (since that's apparently one
    of the only options.. see big rant further todo comments).
- add explicit error-tolerance-tuning via a `handler_tries: int` counter
  and `tries_before_raise: int` limit such that we only bubble
  a `push()` raised exc once enough tries have consecutively failed.
- as mentioned, use the new `chan` fn-sig support and thus the new
  method API for `asyncio` -> `trio` comms.
- a big TODO XXX around the need to use a better sys for terminating
  `asyncio.Task`s whether it's by delegating to some `.to_asyncio`
  internals after a factor-out OR by potentially going full bore `anyio`
  throughout `.to_asyncio`'s impl in general..
- mk `teardown()` use appropriate `log.<level>()`s based on outcome.

Surroundingly,
- add a ton of doc-strings to mod fns previously missing them.
- improved / added-new comments to `wait_on_data_reset()` internals and
  anything changed per ^above.
2026-01-01 17:26:26 -05:00
Tyler Goodlet dce7fac2c1 `ib`: various type-annot, multiline styling and todos updates 2026-01-01 17:26:26 -05:00
Tyler Goodlet 8f690fbd59 `.ui._search`: collapse EGs as needed, use `tn` naming. 2026-01-01 17:26:26 -05:00
Tyler Goodlet f89461fb29 Bump lock file with `tractor` piker pinned branch 2026-01-01 17:26:26 -05:00
Tyler Goodlet f612b8e772 Port `.data._web_bs` stuff to strict-EGs
Using `tractor.trionics.collapse_eg()` as needed and doing
some renames, in similar style as elsewhere:
- `pcs` -> `rent_cs`,
- `n` -> `tn` for nursery handles,

Also,
- tweak the `._reconnect_forever()` while loop to use the
  (also) `trio`-internal
  `mc_state: trio._channel.MemoryChannelState = snd._state` instead
  of `snd._close` to poll for open send/receive consumer task counts
  since,
    1. it seems more reliable then using the `snd._closed`,
    2. there's no other way to access the info.. afaik?

- handle `ConnectionRejected` explicitly alongside handshake-errs as
  a retry case.
- add a base-exc handler which `.exception()` reports the reconnect
  attempt failure explicitly.
- drop some lingering `Optional` usage.
2026-01-01 17:26:26 -05:00
Tyler Goodlet 44b8178809 Use `tractor`'s updated `piker_pin` branch (again)
Instead of the insignificantly named dev branch from recent `trio`
/ py3.13 updates work; it makes more sense to keep a dedicated pin (as
we have prior) for the moment. Also re-org the masked @goodboy dev-env
lines + comments to bottom of file.
2026-01-01 17:26:24 -05:00
Tyler Goodlet 663bf0c4ea Port `.cli` & `.service` to latest `tractor` registry APIs
Namely changes for the `registry_addrs: list`, enable_transports: list`
and related `tractor._addr` primitive requirements.

Other updates include,
- passing `maybe_enable_greenback=True`,
- additional exc logging around `pikerd` syncing/booting,
- changing to newer `Context.wait_for_result()`,
- dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep.
2026-01-01 17:24:09 -05:00
Tyler Goodlet 8099650430 Bump to WIP "piker pin" `tractor` dev branch, with lock file 2026-01-01 17:24:02 -05:00
Tyler Goodlet 6a6d58f8c6 binance; unmask around send-chan @acm usage 2026-01-01 17:22:19 -05:00
Tyler Goodlet 218d0d6a69 ib: add venue-hours checking
Such that we can avoid other (pretty unreliable) "alternative" checks to
determine whether a real-time quote should be waited on or (when venue
is closed) we should just signal that historical backfilling can
commence immediately.

This has been a todo for a very long time and it turned out to be much
easier to accomplish than anticipated..

Deats,
- add a new `is_current_time_in_range()` dt range checker to predicate
  whether an input range contains `datetime.now(start_dt.tzinfo)`.
- in `.ib.feed.stream_quotes()` add a `venue_is_open: bool` which uses
  all of the new ^^ to determine whether to branch for the
  short-circuit-and-do-history-now-case or the std real-time-quotes
  should-be-awaited-since-venue-is-open, case; drop all the old hacks
  trying to workaround not figuring that venue state stuff..

Other,
- also add a gpt5 composed parser to `._util` for the
  `ib_insync.ContractDetails.tradingHours: str` for before i realized
  there was a `.tradingSessions` property XD
- in `.ib_feed`,
  * add various EG-collapsings per recent tractor/trio updates.
  * better logging / exc-handling around ticker quote pushes.
  * stop clearing `Ticker.ticks` each quote iteration; not sure if this
    is needed/correct tho?
  * add masked `Ticker.ticks` poll loop that logs.
- fix some `str.format()` usage in `._util.try_xdo_manual()`
2026-01-01 17:22:19 -05:00
Tyler Goodlet 9b19886285 ib: never relay "Warning:" errors to EMS..
You'd think they could be bothered to make either a "log" or "warning"
msg type instead of a `type='error'`.. but alas, this attempts to detect
all such "warning"-errors and never proxy them to the clearing engine
thus avoiding the cancellation of any associated (by `reqid`)
pre-existing orders (control dialogs).

Also update all surrounding log messages to a more multiline style.
2026-01-01 17:17:51 -05:00
Tyler Goodlet 2d039ecd9a Spurious first-draft of EG collapsing
Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
2026-01-01 17:17:40 -05:00
Tyler Goodlet 416ef53376 Use `.trionics.collapse_eg()` in `.deribit.api`
Commit this change separate from the (original) broader set applied to
the entire code base since the `.deribit.api` mod contained changes from
upstream max-pain work (from our very own @nt) which caused a noticeable
conflict and intros un-required changes from his work to re-enable
`deribit` support.

Note the original commit, "69eac7bb Spurious first-draft of EG
collapsing", applied similar changes through the rest of the code base.
AGAIN, this mod's change is only being broken out to minimize upstream
change conflicts due to updates to the `deribit` backend done earlier in
time-history.
2026-01-01 17:06:51 -05:00
Tyler Goodlet 62cc0575aa ib-related: cope with invalid txn timestamps
That is inside embedded `.accounting.calc.dyn_parse_to_dt()` closure add
an optional `_invalid: list` param to where we can report
bad-timestamped records which we instead override and return as
`from_timestamp(0.)` (when the parser loop falls through) and report
later (in summary ) from the `.accounting.calc.iter_by_dt()` caller
. Add some logging and an optional debug block for future tracing.
2026-01-01 16:01:34 -05:00
Tyler Goodlet b4cbefc76d ib: jig `.data_reset_hack()` with vnc-client failover
Since apparently porting to the new docker container enforces using
a vnc password and `asyncvnc` seems to have a bug/mis-config whenever
i've tried a pw over a wg tunnel..?

Soo, this tries out the old `i3ipc`-win-focus + `xdo` click hack when
the above fails.

Deats,
- add a mod-level `try_xdo_manual()` to wrap calling
  `i3ipc_xdotool_manual_click_hack()` with an oserr handler, ensure we
  don't bother trying if `i3ipc` import fails beforehand tho.
- call ^ from both the orig case block and the failover from the
  vnc-client case.
- factor the `+no_setup_msg: str` out to mod level and expect it to be
  `.format()`-ed.
- refresh todo around `asyncvnc` pw ish..
- add a new `i3ipc_fin_wins_titled()` window-title scanner which
  predicates input `titles` and delivers any matches alongside the orig
  focused win at call time.
- tweak `i3ipc_xdotool_manual_click_hack()` to call ^ and remove prior
  unfactored window scanning logic.
2026-01-01 16:01:34 -05:00
Tyler Goodlet 1a25369d47 Add fix for binance API 3.1 rollout..
See https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
2026-01-01 16:01:12 -05:00
Tyler Goodlet 1f23e4970f Mk a `notes_to_self/` move orig file `ideas.rst' 2026-01-01 16:01:12 -05:00
Tyler Goodlet b61ea7349f Drop old/masked ahab-docker daemon starting 2026-01-01 16:01:12 -05:00
Tyler Goodlet 29a6b498c4 Add `pyperclip` dep for goodboy's xonsh-clipboard needs Bp 2026-01-01 16:01:10 -05:00
Tyler Goodlet 26162e398a Try running daemons on UDS tpt
The root daemon, pikerd, needs to be adjusted to use diff default
registry addrs to also utilize non-TCP, but for now this gets us started
testing; so far so good B)
2026-01-01 16:00:42 -05:00
Tyler Goodlet d3ae2b26f6 Adjust feed status fields/display-pane to new actor-ID
That is to use the new `tractor.msg.types.Aid` struct to pull the
`brokerd` info from the `tractor.Channel.aid: Aid` attr as well as more
generally handling the new `Channel.raddr.proto_key: str` and no longer
assuming a TCP IPC transport; this per the recent `tractor.ipc`
subsys which adds multi-IPC-transports!

Downstream tweaks to match,
- use an "opt-in" field set to display in the `brokerd` info pane in
  `.ui._feedstatus.mk_feed_label()`.
 |_ also add some todos and drop some seemingly unneeded form sizing
    calcs?
- tweak `.ui._label` to allow not using markdown, though ended up not
  doing that since it looked too plain..
2026-01-01 16:00:01 -05:00
Tyler Goodlet d0328bd640 Adjust to `trio`'s strict eg nurseries throughout!
Using `tractor.trionics.collapse_eg()` as needed to avoid, at the least,
crash-worthy (in debug-mode REPL-ing terms) nested cancellation egs that
exhibit on SIGINT/ctl-c of each "app" (chart & daemon).

Also a bit of renaming of all `trio.Nursery`s to `tn`, the new "task
nursery" shorthand-var-name being used in all our other `tractor`
related projects.
2026-01-01 16:00:01 -05:00
Tyler Goodlet 3c92b0c255 kraken: add crash-handling around `Pair()` init
Since it can otherwise be difficult to debug due to nursery cancellation
(we need that taskman yo!).
2026-01-01 16:00:01 -05:00
Tyler Goodlet bd1fc32368 kraken: `Pair.costmin` is now optional?
Some pairs don't seem to define it but it's not listed as deprecated on
official API page (new one now linked in type def's doc string).
2026-01-01 16:00:01 -05:00
Tyler Goodlet d99c55b16f Start a manual `tags` file for internal refs 2026-01-01 16:00:01 -05:00
Tyler Goodlet d0789956d0 Flip to non-git`msgspec`, update `bidict`, link to "sdof" `tractor` dev branch 2026-01-01 15:59:59 -05:00
Tyler Goodlet 305db791ee Fix readme to `uv sync`.. link to astral docs 2026-01-01 15:59:23 -05:00
Tyler Goodlet ebc8c70779 Support python 3.13 !!
Luckily all core deps are already ported so this was pretty easy!

B)

I've opted (via `tool.uv` settings) to prefer the user's system
(installed) python distro and disable auto-download of astral's
distros for now since I recently hit some strange silent core dumping
(`brokerd` actors just disappearing..)
with their binaries; an introspect showed it seemingly todo with
p_threading in cpython internals? We can figure out how to
better accommodate users with the opposite pref later, presumably
non-opinionated-linux hackers?

Core pkg upgrades of note,
- manually re-pinned most numerics libs including `numpy`, `numba`,
  `pyarrow`.
- for AOT ext-libs (thanks to `uv.lock` being so detailed), new
  `cython`, `llvmlite`, `cffi`, `rapidfuzz`, `uvloop`, `wrapt` and
  `PyQt6` wheels pulled in.
- `cryptofeed` did a required bump to `2.4.0` looks like which also
  required the above (and notable?) `cffi` update.
2026-01-01 15:57:54 -05:00
Tyler Goodlet fe9ff1afe4 Flip to latest `tractor` @ `branch = main` deps
Namely requiring a `trio` that supports py3.13, so "trio >=0.27".
Unfortunately this brings in strict egs and drops various `trio`-related
sub-deps we also import in `piker`, like `trio-typing`. So there's a few
"rough edges", mostly todo with the REPL activating on graceful cancels
(SIGINT) of `piker` CLIs atm - due to the new strict-egs in recent
`trio`, but nothing we can't work out pretty quickly i'd imagine with
the new `tractor.collapse_eg()` stacker.

Note that we're pinning to `tractor`'s main branch for the moment since
it should be "stable" vs. the `repl_fixture` i'm likely running local Bp
2026-01-01 15:57:53 -05:00
Tyler Goodlet e2f95c2bee Add a couple new grays to the pallete 2026-01-01 15:55:18 -05:00
Tyler Goodlet d92fcb982c Bump to (latest) `polars`, the `0.20.6x` series B)
Since I was trying out the neat lookin `polars-fuzzy-match` (also added
for now as a core dep here) which requires the new plugin sys, plus it's
about time we synced with upstream!

Adjust some column syntax to the new `.name` sub-field-space and the
`uv` lock-file to match.

Other,
- add back `trio-typing` bc i guess something else needs it (debug
  tooling stuff in new `tractor`?)
- flip back to the `tractor` pre-main pin since the new `main`-branch
  requires new `trio` stuff we haven't ported yet..
2026-01-01 15:55:16 -05:00
Tyler Goodlet b61145ec5a Add missing f-str prefix to log line 2026-01-01 15:16:22 -05:00
Tyler Goodlet 624cca091a Port to newer `tractor.get_registry()` 2026-01-01 15:16:22 -05:00
Tyler Goodlet 9045e18386 binance: add new `permissionSets` to base `Pair` 2026-01-01 15:16:22 -05:00
Tyler Goodlet 23ea65e337 Fix type-check assertion in ems test to use `is` 2026-01-01 15:16:22 -05:00
Tyler Goodlet ea2e374101 Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-01 15:16:22 -05:00
Tyler Goodlet f64fcc69ed Update legacy type to `tractor.MsgStream` 2026-01-01 15:16:22 -05:00
Tyler Goodlet f3a20ed77f TOSQUASH: 84ad34f51, one more `float` cast for paperboi.. 2026-01-01 15:16:22 -05:00
Tyler Goodlet 95cdaf8114 TOSQUASH: 84ad34f51, lingering `float` casts.. 2026-01-01 15:16:22 -05:00
Tyler Goodlet 39dcaf528a Drop variable regex from `ruff.toml`
Same as in other projects, seems to be not parsing and causing `ruff` to
crash?!?
2026-01-01 15:16:22 -05:00
Tyler Goodlet 3f663e0e73 `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-01 15:16:22 -05:00
Tyler Goodlet de542c90fb Cast to `float` as needed from order-mode and ems
Since we're not quite yet using automatic typed msging from
`tractor`/`msgspec` (i.e. still manually decoding order ctl msgs from
built-in types..`dict`s still not `msgspec.Struct`) this adds the
appropriate typecasting ops to ensure the required precision is attained
prior to processing and/or submission to a brokerd backend service.

For the `.clearing._ems`,
- flip all `trigger_price` previously presumed to be `float` to just
  the field-identical `price: Decimal` and ensure we cast to `float`
  for any `trigger_price` usage, like before passing to `mk_check()`.

For `.ui.order_mode.OrderMode`,
- add a new `.curr_mkt: MktPair` convenience property to get the
  chart-active value.
- ensure we always use the `.curr_mkt.quantize() -> Decimal` before
  setting any IPC-msg's `.price` field!
- always cast `float(Order.price)` before use in setting line-levels.
- don't bother setting `Order.symbol` to a (now fully removed) `Symbol`
  instance since it's not really required-for-use anywhere; leaving it
  a `str` (per the type-annot) is fine for now?
2026-01-01 15:16:22 -05:00
Tyler Goodlet 41559e6729 Finally drop `Symbol`
It was replaced by `MktPair` long ago in,
https://github.com/pikers/piker/pull/489

with follow up for final removal in,
https://github.com/pikers/piker/issues/517

Resolves #517
2026-01-01 15:16:22 -05:00
Tyler Goodlet 93e22e27b9 Mk `Brokerd[Order].price` avoid `float`-errs
By re-typing to a `.price: Decimal` field on both legs of the EMS.

It seems we must do it ourselves since,
- these msg's (fields) are relayed through the clearing engine to each
  `brokerd` backend and,
- bc many (if not all) of those backends `.broker`-clients (nor their
  encapsulated "brokerage services") **are not** doing any
  precision-truncation themselves.

So, for now, instead we opt to expect rounding at the source. This means
we will explicitly require casting to/from `float` at the line-graphics
interface to the order-clearing-engine (as implemented throughout
`.ui.order_mode.OrderMode`); and this is coming shortly.
2026-01-01 15:16:22 -05:00
Gud Boi a00e9c0e64 Merge pull request 'ems_no_last_required: don't require `last` field to boot dark-pool engine' (#38) from ems_no_last_required into main
Submitted-in: https://www.pikers.dev/pikers/piker/pulls/38
2026-01-01 20:15:57 +00:00
Gud Boi cb694700c2 Merge pull request 'stop_is_oec: expect `trio.EndOfChannel` as graceful stream shutdown' (#52) from stop_is_eoc into main
Submitted-as: https://www.pikers.dev/pikers/piker/pulls/52
2026-01-01 19:57:35 +00:00
Tyler Goodlet 11c931f65d User `piker_pin` branch from gitea `tractor` repo 2026-01-01 14:50:23 -05:00
Tyler Goodlet 60390ae596 Various `.clearing` todos/notes on potential issues with loglevel settings.. 2025-02-21 16:25:22 -05:00
Tyler Goodlet 9592735aaa .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-21 16:25:22 -05:00
Tyler Goodlet 49841f5b91 Catch using `Sampler.bcast_errors` where possible
In all other possible IPC disconnect handling blocks. Also more
comprehensive typing throughout `uniform_rate_send()`.
2025-02-21 16:24:54 -05:00
Tyler Goodlet b2827ef3c3 Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed
externally as needed for catching other similar pub-sub/IPC failures in
other (related) real-time sub-systems.

Also added some now-masked logging for debugging live-feed stream reading
issues that should ONLY be used for debugging since they'll greatly
degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we
should prolly pull from `modden` as a dep) for pretty console emissions.
2025-02-21 16:24:54 -05:00
Tyler Goodlet 2fc4ccf011 Suppress `trio.EndOfChannel`s raised by remote peer
Since now `tractor` will raise this native `trio`-exc translated from
a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`.
Just `info()` log in such cases versus continuing to warn for the
others.
2025-02-21 16:24:54 -05:00
22 changed files with 564 additions and 1676 deletions

View File

@ -1,7 +1,9 @@
[network]
tsdb.backend = 'marketstore'
tsdb.host = 'localhost'
tsdb.grpc_port = 5995
pikerd = [
'/ipv4/127.0.0.1/tcp/6116', # std localhost daemon-actor tree
# '/uds/6116', # TODO std uds socket file
]
[ui]
# set custom font + size which will scale entire UI

View File

@ -1,30 +1,71 @@
running ``ib`` gateway in ``docker``
------------------------------------
We have a config based on the (now defunct)
image from "waytrade":
We have a config based on a well maintained community
image from `@gnzsnz`:
https://github.com/waytrade/ib-gateway-docker
https://github.com/gnzsnz/ib-gateway-docker
To startup this image with our custom settings
simply run the command::
To startup this image simply run the command::
docker compose up
And you should have the following socket-available services:
(For further usage^ see the official `docker-compose`_ docs)
- ``x11vnc1@127.0.0.1:3003``
And you should have the following socket-available services by
default:
- ``x11vnc1 @ 127.0.0.1:5900``
- ``ib-gw @ 127.0.0.1:4002``
You can attach to the container via a VNC client
without password auth.
You can now attach to the container via a VNC client with password-auth;
here is an example using ``vncclient`` on ``linux``::
SECURITY STUFF!?!?!
-------------------
Though "``ib``" claims they host filter connections outside
localhost (aka ``127.0.0.1``) it's probably better if you filter
the socket at the OS level using a stateless firewall rule::
vncviewer localhost:5900
now enter the pw you set via an (see second code blob) `.env file`_
or pw-file according to the `credentials section`_.
If you want to change away from their default config see the example
`docker-compose.yml`-config issue and config-section of the readme,
- https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
- https://github.com/gnzsnz/ib-gateway-docker/discussions/103
.. _.env file: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#how-to-use-it
.. _docker-compose: https://docs.docker.com/compose/
.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials
IF you also want to run ``TWS``
-------------------------------
You can also run it containerized,
https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#using-tws
SECURITY stuff (advanced, only if you're paranoid)
--------------------------------------------------
First and foremost if doing a "distributed" container setup where you
run the ``ib-gw`` docker container and your connecting API client
(likely ``ib_async`` from python) on **different hosts** be sure to
read the `security considerations`_ section!
And for a further (somewhat paranoid) perspective from
a long-time-ago serious devops eng..
Though "``ib``" claims they filter remote host connections outside
``localhost`` (aka ``127.0.0.1`` on ipv4) it's prolly justified if
you'd like to filter the socket at the *OS level* using a stateless
firewall rule::
ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002
We will soon have this baked into our own custom image but for
now you'll have to do it urself dawgy.
We will soon have this either baked into our own custom derivative
image (or patched into the current upstream one after further testin)
but for now you'll have to do it urself, diggity dawg.
.. _security considerations: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#security-considerations

View File

@ -1,338 +0,0 @@
#!/usr/bin/env python
from decimal import (
Decimal,
)
from pathlib import Path
import numpy as np
# import polars as pl
import trio
import tractor
from datetime import datetime
# from pprint import pformat
from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
from piker.storage import open_storage_client, StorageClient
from piker.log import get_logger
import sys
import pyqtgraph as pg
from PyQt6 import QtCore
from pyqtgraph import ScatterPlotItem, InfiniteLine
from PyQt6.QtWidgets import QApplication
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
# XXX, use 2 newlines between top level LOC (even between these
# imports and the next function line ;)
def check_if_complete(
oi: dict[str, dict[str, Decimal | None]]
) -> bool:
return all(
oi[strike]['C'] is not None
and
oi[strike]['P'] is not None for strike in oi
)
async def max_pain_daemon(
) -> None:
oi_by_strikes: dict[str, dict[str, Decimal | None]]
instruments: list[Symbol] = []
expiry_dates: list[str]
expiry_date: str
currency: str = 'btc'
kind: str = 'option'
async with get_client(
) as client:
expiry_dates: list[str] = await client.get_expiration_dates(
currency=currency,
kind=kind
)
log.info(
f'Available expiries for {currency!r}-{kind}:\n'
f'{expiry_dates}\n'
)
expiry_date: str = input(
'Please enter a valid expiration date: '
).upper()
print('Starting little daemon...')
# maybe move this type annot down to the assignment line?
oi_by_strikes: dict[str, dict[str, Decimal]]
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
oi_by_strikes = client.get_strikes_dict(instruments)
def get_total_intrinsic_values(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, dict[str, Decimal]]:
call_cash: Decimal = Decimal(0)
put_cash: Decimal = Decimal(0)
intrinsic_values: dict[str, dict[str, Decimal]] = {}
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
intrinsic_values[strike] = {
'C': call_cash,
'P': put_cash,
'total': call_cash + put_cash,
}
return intrinsic_values
def get_intrinsic_value_and_max_pain(
intrinsic_values: dict[str, dict[str, Decimal]]
):
# We meed to find the lowest value, so we start at
# infinity to ensure that, and the max_pain must be
# an amount greater than zero.
total_intrinsic_value: Decimal = Decimal('Infinity')
max_pain: Decimal = Decimal(0)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
if intrinsic_values[strike]['total'] < total_intrinsic_value:
total_intrinsic_value = intrinsic_values[strike]['total']
max_pain = s
return total_intrinsic_value, max_pain
def plot_graph(
oi_by_strikes: dict[str, dict[str, Decimal]],
plot,
):
"""Update the bar graph with new open interest data."""
plot.clear()
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
for strike_str in sorted(oi_by_strikes, key=lambda x: int(x)):
strike = int(strike_str)
calls_val = float(oi_by_strikes[strike_str]['C'])
puts_val = float(oi_by_strikes[strike_str]['P'])
bar_c = pg.BarGraphItem(
x=[strike - 100],
height=[calls_val],
width=200,
pen='w',
brush=(0, 0, 255, 150)
)
plot.addItem(bar_c)
bar_p = pg.BarGraphItem(
x=[strike + 100],
height=[puts_val],
width=200,
pen='w',
brush=(255, 0, 0, 150)
)
plot.addItem(bar_p)
total_val = float(intrinsic_values[strike_str]['total']) / 100000
scatter_iv = ScatterPlotItem(
x=[strike],
y=[total_val],
pen=pg.mkPen(color=(0, 255, 0), width=2),
brush=pg.mkBrush(0, 255, 0, 150),
size=3,
symbol='o'
)
plot.addItem(scatter_iv)
_, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
vertical_line = InfiniteLine(
pos=max_pain,
angle=90,
pen=pg.mkPen(color='yellow', width=1, style=QtCore.Qt.PenStyle.DotLine),
label=f'Max pain: {max_pain:,.0f}',
labelOpts={
'position': 0.85,
'color': 'yellow',
'movable': True
}
)
plot.addItem(vertical_line)
def update_oi_by_strikes(msg: tuple):
nonlocal oi_by_strikes
if 'oi' == msg[0]:
strike_price = msg[1]['strike_price']
option_type = msg[1]['option_type']
open_interest = msg[1]['open_interest']
oi_by_strikes.setdefault(
strike_price, {}
).update(
{option_type: open_interest}
)
# Define the structured dtype
dtype = np.dtype([
('time', int),
('oi', float),
('oi_calc', float),
])
async def write_open_interest_on_file(msg: tuple, client: StorageClient):
if 'oi' == msg[0]:
nonlocal expiry_date
timestamp = msg[1]['timestamp']
strike_price = msg[1]["strike_price"]
option_type = msg[1]['option_type'].lower()
col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}'
# Create the numpy array with sample data
data = np.array([
(
int(timestamp),
float(msg[1]['open_interest']),
np.nan,
),
], dtype=dtype)
path: Path = await client.write_oi(
col_sym_key,
data,
)
# TODO, use std logging like this throughout for status
# emissions on console!
log.info(f'Wrote OI history to {path}')
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
'''
This method requires only the strike_prices and oi for call
and puts, the closes list are the same as the strike_prices
the idea is to sum all the calls and puts cash for each strike
and the ITM strikes from that strike, the lowest value is what we
are looking for the intrinsic value.
'''
nonlocal timestamp
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
total_intrinsic_value, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
return {
'timestamp': timestamp,
'expiry_date': expiry_date,
'total_intrinsic_value': total_intrinsic_value,
'max_pain': max_pain,
}
async with (
open_storage_client() as (_, storage),
maybe_open_oi_feed(
instruments,
) as oi_feed,
):
# Initialize QApplication
app = QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True)
win.setWindowTitle('Calls (blue) vs Puts (red)')
plot = win.addPlot(title='OI by Strikes')
plot.showGrid(x=True, y=True)
print('Plot initialized...')
async for msg in oi_feed:
# In memory oi_by_strikes dict, all message are filtered here
# and the dict is updated with the open interest data
update_oi_by_strikes(msg)
# Write on file using storage client
await write_open_interest_on_file(msg, storage)
# Max pain calcs, before start we must gather all the open interest for
# all the strike prices and option types available for a expiration date
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
# Here we must read for the filesystem all the latest open interest value for
# each instrument for that specific expiration date, that means look up for the
# last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is
# hardcoded to something, sorry.)
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
# intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
# graph here
plot_graph(oi_by_strikes, plot)
# TODO, use a single multiline string with `()`
# and drop the multiple `print()` calls (this
# should be done elsewhere in this file as well!
#
# As per the docs,
# https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation
# you could instead do,
# print(
# '-----------------------------------------------\n'
# f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}\n'
# )
# WHY?
# |_ less ctx-switches/calls to `print()`
# |_ the `str` can then be modified / passed
# around as a variable more easily if needed in
# the future ;)
#
# ALSO, i believe there already is a stdlib
# module to do "alignment" of text which you
# could try for doing the right-side alignment,
# https://docs.python.org/3/library/textwrap.html#textwrap.indent
#
print('-----------------------------------------------')
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
print(f'expiry_date: {max_pain['expiry_date']}')
print(f'max_pain: {max_pain['max_pain']:,.0f}')
print(f'total intrinsic value: {max_pain['total_intrinsic_value']:,.0f}')
print('-----------------------------------------------')
# Process GUI events to keep the window responsive
app.processEvents()
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='info',
) as an:
from tractor import log
log.get_console_log(level='info')
ptl: tractor.Portal = await an.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
# ^TODO, we can actually run this in the root-actor now
# if needed as per 2nd "section" in,
# https://pikers.dev/goodboy/tractor/pulls/2
#
# NOTE, will first require us porting to modern
# `tractor:main` though ofc!
)
await ptl.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -1,29 +0,0 @@
## Max Pain Calculation for Deribit Options
This feature, which calculates the max pain point for options traded
on the Deribit exchange using cryptofeed library.
- Functions in the api module for fetching options data from Deribit.
[commit](https://pikers.dev/pikers/piker/commit/da55856dd2876291f55a06eb0561438a912d8241)
- Compute the max pain point based on open interest data using
deribit's api.
[commit](https://pikers.dev/pikers/piker/commit/0d9d6e15ba0edeb662ec97f7599dd66af3046b94)
### How to test it?
**Before start:** in order to get this working with `uv`, you
**must** use my [`tractor` fork](https://pikers.dev/ntorres/tractor/src/branch/aio_abandons)
and this branch: `aio_abandons`, the reason is that I cherry-pick the
`uv_migration` that guille made, for some reason that a didn't dive
into, in my system y need tractor using `uv` too. quite hacky
I guess.
1. `uv lock`
2. `uv run --no-dev python examples/max_pain.py`
3. A message should be display, enter one of the expiration date
available.
4. The script should be up and running.

View File

@ -51,7 +51,6 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -62,6 +61,7 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -102,7 +102,7 @@ class Pair(Struct, frozen=True, kw_only=True):
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool|None = None
pegInstructionsAllowed: bool = False
filters: dict[
str,

View File

@ -25,7 +25,6 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -35,20 +34,15 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

File diff suppressed because it is too large Load Diff

View File

@ -18,59 +18,38 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
from typing import Any, Optional, Callable
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
import pendulum
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -85,215 +64,90 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array: np.ndarray = await client.bars(
mkt,
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
raise DataUnavailable
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
sym = symbols[0]
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(
mkt_info=mkt,
)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
nsym = piker_sym_to_cb_sym(sym)
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
async with maybe_open_price_feed(sym) as stream:
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
cache = await client.cache_symbols()
# else:
last_trade = Trade(
**(last_trades[0])
)
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
first_quote: dict = {
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -304,84 +158,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((
init_msgs,
first_quote,
))
task_status.started((init_msgs, first_quote))
feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
@tractor.context
@ -391,21 +174,12 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
# cache = client._pairs
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)
# repack in dict form
await stream.send(
await client.search_symbols(pattern))

View File

@ -1,196 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -97,10 +97,6 @@ from ._util import (
get_logger,
)
# ?TODO? this can now be removed since it was originally to extend
# with a `bar_vwap` field that we removed from the default ohlcv
# dtype since it's better calculated in an FSP func
#
_bar_load_dtype: list[tuple[str, type]] = [
# NOTE XXX: only part that's diff
# from our default fields where

View File

@ -214,7 +214,9 @@ async def open_history_client(
# could be trying to retreive bars over weekend
if out is None:
log.error(f"Can't grab bars starting at {end_dt}!?!?")
log.error(
f"No bars starting at {end_dt!r} !?!?"
)
if (
end_dt
and head_dt

View File

@ -171,7 +171,6 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -245,6 +244,11 @@ async def open_ems(
async with maybe_open_emsd(
broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel,
) as portal:

View File

@ -655,7 +655,11 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -718,7 +722,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
for client_stream in subs.copy():
try:
await client_stream.send(msg)
sent_some = True
@ -1014,6 +1018,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,

View File

@ -297,6 +297,8 @@ class PaperBoi(Struct):
# transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition(
broker=self.broker,

View File

@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name=subsys,

View File

@ -95,6 +95,12 @@ class Sampler:
# history loading.
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are
# notified on a step.
@ -258,14 +264,15 @@ class Sampler:
subs: set
last_ts, subs = pair
task = trio.lowlevel.current_task()
log.debug(
f'SUBS {self.subscribers}\n'
f'PAIR {pair}\n'
f'TASK: {task}: {id(task)}\n'
f'broadcasting {period_s} -> {last_ts}\n'
# NOTE, for debugging pub-sub issues
# task = trio.lowlevel.current_task()
# log.debug(
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
# f'PAIR: {pair}\n'
# f'TASK: {task}: {id(task)}\n'
# f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
# )
borked: set[MsgStream] = set()
sent: set[MsgStream] = set()
while True:
@ -282,13 +289,11 @@ class Sampler:
await stream.send(msg)
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
except self.bcast_errors as err:
log.error(
f'{stream._ctx.chan.uid} dropped connection'
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
)
borked.add(stream)
else:
@ -395,7 +400,8 @@ async def register_with_sampler(
finally:
if (
sub_for_broadcasts
and subs
and
subs
):
try:
subs.remove(stream)
@ -562,8 +568,7 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus, # noqa
bus: _FeedsBus,
rt_shm: ShmArray,
hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
@ -583,11 +588,33 @@ async def sample_and_broadcast(
overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this!
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -660,6 +687,21 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct
@ -729,18 +771,14 @@ async def sample_and_broadcast(
if lags > 10:
await tractor.pause()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
except Sampler.bcast_errors as ipc_err:
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
if ctx:
log.warning(
'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'x>) {ctx.cid}@{chan.uid}'
f'|_{ipc_err!r}\n\n'
)
if sub.throttle_rate:
assert ipc._closed
@ -757,12 +795,11 @@ async def sample_and_broadcast(
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -780,13 +817,16 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
'''
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
# instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# send cycle state
first_quote: dict|None
first_quote = last_quote = None
last_send = time.time()
diff = 0
last_send: float = time.time()
diff: float = 0
task_status.started()
ticks_by_type: dict[
@ -797,22 +837,28 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears']
while True:
# compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff
left_to_sleep: float = throttle_period - diff
if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
break
diff = time.time() - last_send
diff: float = time.time() - last_send
if not first_quote:
first_quote = last_quote
first_quote: float = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
@ -873,11 +919,12 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
await stream.send({
sym: first_quote
})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
@ -885,20 +932,28 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}'
)
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# NOTE: any of these can be raised by `tractor`'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
except (
ConnectionResetError,
trio.EndOfChannel,
):
) + Sampler.bcast_errors as ipc_err:
match ipc_err:
case trio.EndOfChannel():
log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
await stream.aclose()
return

View File

@ -18,8 +18,8 @@
Log like a forester!
"""
import logging
import reprlib
import json
import reprlib
from typing import (
Callable,
)
@ -90,6 +90,8 @@ def colorize_json(
)
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:

View File

@ -138,16 +138,6 @@ class StorageClient(
) -> None:
...
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None:
...
class TimeseriesNotFound(Exception):
'''

View File

@ -111,24 +111,6 @@ def mk_ohlcv_shm_keyed_filepath(
return path
def mk_oi_shm_keyed_filepath(
fqme: str,
period: float | int,
datadir: Path,
) -> Path:
if period < 1.:
raise ValueError('Sample period should be >= 1.!?')
path: Path = (
datadir
/
f'{fqme}.oi{int(period)}s.parquet'
)
return path
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
filename: str = str(path.name)
@ -190,11 +172,7 @@ class NativeStorageClient:
key: str = path.name.rstrip('.parquet')
fqme, _, descr = key.rpartition('.')
if 'ohlcv' in descr:
prefix, _, suffix = descr.partition('ohlcv')
elif 'oi' in descr:
prefix, _, suffix = descr.partition('oi')
period: int = int(suffix.strip('s'))
# cache description data
@ -391,61 +369,6 @@ class NativeStorageClient:
timeframe,
)
def _write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Sync version of the public interface meth, since we don't
currently actually need or support an async impl.
'''
path: Path = mk_oi_shm_keyed_filepath(
fqme=fqme,
period=1,
datadir=self._datadir,
)
if isinstance(oi, np.ndarray):
new_df: pl.DataFrame = tsp.np2pl(oi)
else:
new_df = oi
if path.exists():
old_df = pl.read_parquet(path)
df = pl.concat([old_df, new_df])
else:
df = new_df
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
return path
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Write input oi time series for fqme and sampling period
to (local) disk.
'''
return self._write_oi(
fqme,
oi,
)
async def delete_ts(
self,
key: str,

View File

@ -156,12 +156,10 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# TODO, long term we should be synced to upstream `main` branch!
# tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# goodboy's dev-env
# XXX for @goodboy's hackin dev env, usually there's something new in
# the runtime being seriously tested here Bp
# tractor = { path = "../tractor/", editable = true }
# xonsh = { path = "../xonsh", editable = true }
# XXX since, we're like, always hacking new shite all-the-time. Bp
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "main" }
# ------ goodboy ------
# hackin dev-envs, usually there's something new he's hackin in..
# tractor = { path = "../tractor", editable = true }