Compare commits

...

29 Commits

Author SHA1 Message Date
Tyler Goodlet 7eacab0ac5 Allow ledger passes to ignore (symcache) unknown fqmes
For example in the paper-eng, if you have a backend that doesn't fully
support a symcache (yet) it's handy to be able to ignore processing
other paper-eng txns when all you care about at the moment is the
simulated symbol.

NOTE, that currently this will still result in a key-error when you load
more then one mkt with the paper engine (for which the backend does not
have the symcache implemented) since no fqme ad-hoc query was made for
the 2nd symbol (and i'm not sure we should support that kinda hackery
over just encouraging the sym-cache being added?). Def needs a little
more thought depending on how many backends are never going to be able
to (easily) support caching..
2025-02-11 18:27:41 -05:00
Tyler Goodlet 16e8acf3c4 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-11 18:27:41 -05:00
Tyler Goodlet 78d76f64cd `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2025-02-11 18:27:41 -05:00
Nelson Torres 77477bf6a2 Deleted settlePlan field from binance FutesPair. 2025-02-11 18:27:41 -05:00
Nelson Torres d4067b31fb Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2025-02-11 18:27:41 -05:00
Tyler Goodlet 7928d88b5f data._web_bs: try to raise jsonrpc errors in parent task 2025-02-11 18:27:41 -05:00
Tyler Goodlet 40602d40ca Official service-mngr to `tractor.hilevel` move
Such that we maintain that subsys in the actor-runtime repo (with
hopefully an extensive test suite XD).

Port deats,
- rewrite `open_service_mngr()` as a thin wrapper that delegates into
  the new `tractor.hilevel.open_service_mngr()` but with maintenance of
  the `Services` class-singleton for now.
- port `.service._daemon` usage to the new
  `ServiceMngr.start_service_ctx()` a rename from
  `.start_service_task()` which is now likely destined for the soon
  supported `tractor.trionics.TaskMngr` nursery extension.
- ref the new `ServiceMngr.an: ActorNursery` instance var name.

Other,
- always enable the `tractor.pause_from_sync()` support via `greenback`
  whenever `debug_mode` is set at `pikerd` init.
2025-02-11 18:01:48 -05:00
Nelson Torres f148114adb Updated tractor method name. 2025-02-11 18:01:48 -05:00
Tyler Goodlet 9498e5f102 More service-mngr clarity notes
Nothing changing functionally here just adding more `tractor`
operational notes, tips for debug tooling and typing fixes B)

Of particular note is adding further details about the reason we do not
need to call `Context.cancel()` inside the `finally:` block of
`.open_context_in_task()` thanks to `tractor`'s new and improved
inter-actor cancellation semantics Bo
2025-02-11 18:01:48 -05:00
Tyler Goodlet 53eb6b8f91 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-02-11 18:01:48 -05:00
Tyler Goodlet c7d11a68c1 Lel, forgot to add a `SPOT` venue for `binance`.. 2025-02-11 18:01:48 -05:00
Tyler Goodlet c36c38f432 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-02-11 18:01:48 -05:00
Tyler Goodlet 41b0584588 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-02-11 18:01:48 -05:00
Tyler Goodlet 844544ed8e Port binance to `httpx`
Like other backends use the `AsyncClient` for all venue specific
client-sessions but change to allocating them inside `get_client()`
using an `AsyncExitStack` and inserting directly in the
`Client.venue_sesh: dict` table during init.

Supporting impl tweaks:
- remove most of the API client session building logic and instead make
  `Client.__init__()` take in a `venue_sessions: dict` (set it to
  `.venue_sesh`) and `conf: dict`, instead opting to do the http client
  configuration inside `get_client()` since all that code only needs to
  be run once.
 |_load config inside `get_client()` once.
 |_move session token creation into a new util func `init_api_keys()` and
  also call it from `get_client()` factory; toss in an ex. toml section
  config to the doc string.
- define `_venue_urls: dict[str, str]` (content taken from old static
  `.venue_sesh` dict) at module level and feed them as `base_url: str`
  inputs to the client create loop.
- adjust all call sigs in httpx-sesh-using methods, namely just
  `._api()`.
- do a `.exch_info()` call in `get_client()` to cache the symbology
  set.

Unrelated changes for various other outstanding buggers:
- to get futures feeds correctly loading when selected
  from search (like 'XMRUSDT.USDTM.PERP'), expect a `MktPair` input to
  `Client.bars()` such that the exact venue-key can be looked up (via
  a new `.pair2venuekey()` meth) and then passed to `._api()`.
- adjust `.broker.open_trade_dialog()` to failover to paper engine when
  there's no `api_key` key set for the `subconf` venue-key.
2025-02-11 16:27:28 -05:00
Nelson Torres f479252d26 Added note to exception when missing field in SpotPair class 2025-02-11 16:27:28 -05:00
Nelson Torres 033ef2e35e Added new fields to SpotPair class in venues 2025-02-11 16:27:28 -05:00
Tyler Goodlet 2cdece244c binance: raise `NoData` on null hist arrays
Like we do with other history backends to indicate lack of a data set.
This avoids any raise that will will bring down the backloader task with
some downstream error.

Raise a `ValueError` on no time index for now.
2025-02-11 16:27:28 -05:00
Tyler Goodlet 018694bbdb Woops, `data` can be an empty list XD 2025-02-11 16:27:28 -05:00
Tyler Goodlet 128a2d507f Woops, fix missing `api_url` ref in error log 2025-02-11 16:27:28 -05:00
Tyler Goodlet 430650a6a7 Change type-annots to use `httpx.Response` 2025-02-11 16:27:28 -05:00
Tyler Goodlet 1da3cf5698 Port `kucoin` backend to `httpx` 2025-02-11 16:27:28 -05:00
Tyler Goodlet a348603fc4 Port `kraken` backend to `httpx` 2025-02-11 16:27:28 -05:00
goodboy 86047824d8 Merge pull request '`.brokers.ib` random fixes-n-improvements from various other dev branches..' (#27) from ib_refinements into gitea_feats
Merged-in: #27
2025-02-11 21:26:20 +00:00
Tyler Goodlet cb92abbc38 ib: add connect status info emit 2025-02-11 14:56:17 -05:00
Tyler Goodlet 70332e375b ib: `.api` mod and log-fmt cleaning
About time we tidy'd a buncha status logging in this backend..
particularly for boot-up where there's lots of client-try-connect poll
looping with account detection from the user config.

`.api.Client` pprint and logging fmt improvements:
- add `Client.__repr__()` which shows the minimally useful set of info
  from the underlying `.ib: IB` as well as a new `.acnts: list[str]`
  of the account aliases defined in the user's `brokers.toml`.
- mk `.bars()` define a comprehensive `query_info: str` with all the
  request deats but only display if there's a problem with the response
  data.
- mk `.get_config()` report both the config file path and the acnt
  aliases (NOT the actual account #s).
- move all `.load_aio_clients()` client poll loop requests do
  `log.runtime()` statuses, only falling through to a `.warning()` when
  the loop fails to connect the client to the spec-ed API-gw addr, and
 |_ don't allow loading accounts for which the user has not defined an
    alias in `brokers.toml::[ib]`; raise a value-error in such cases
    with a message indicating how to mod the config.
 |_ only `log.info()` about acnts if some were loaded..

Other mod logging de-noising:
- better status fmting in `.symbols.open_symbol_search()` with
  `repr(Client)`.
- for `.feed.stream_quotes()` first quote reporting use `.runtime()`.
2025-02-11 14:56:17 -05:00
Tyler Goodlet 4940aabe05 ib: warn about mkt precision cuckups that `Contract`s clearly deliver wrong.. 2025-02-11 14:56:17 -05:00
Tyler Goodlet 4f9998e9fb ib: mask out trade and vlm rates for now 2025-02-11 14:56:17 -05:00
Tyler Goodlet c92a236196 ib: more trade record edge case handling
- timestamps came as `'date'`-keyed from 2022 and before but now are
  `'datetime'`..
- some symbols seem to have no commission field, so handle that..
- when no `'price'` field found return `None` from `norm_trade()`.
- add a warn log on mid-fill commission updates.
2025-02-11 14:56:17 -05:00
goodboy e4cd1f85f6 Merge pull request 'pyqt6' (#3) from pyqt6 into gitea_feats
Reviewed-on: #3 (well by fomo anyway..)
2025-02-11 17:25:03 +00:00
33 changed files with 983 additions and 593 deletions

View File

@ -30,7 +30,8 @@ from types import ModuleType
from typing import (
Any,
Iterator,
Generator
Generator,
TYPE_CHECKING,
)
import pendulum
@ -59,8 +60,10 @@ from ..clearing._messages import (
BrokerdPosition,
)
from piker.types import Struct
from piker.data._symcache import SymbologyCache
from ..log import get_logger
from piker.log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(__name__)
@ -493,6 +496,17 @@ class Account(Struct):
_mktmap_table: dict[str, MktPair] | None = None,
only_require: list[str]|True = True,
# ^list of fqmes that are "required" to be processed from
# this ledger pass; we often don't care about others and
# definitely shouldn't always error in such cases.
# (eg. broker backend loaded that doesn't yet supsport the
# symcache but also, inside the paper engine we don't ad-hoc
# request `get_mkt_info()` for every symbol in the ledger,
# only the one for which we're simulating against).
# TODO, not sure if there's a better soln for this, ideally
# all backends get symcache support afap i guess..
) -> dict[str, Position]:
'''
Update the internal `.pps[str, Position]` table from input
@ -535,11 +549,32 @@ class Account(Struct):
if _mktmap_table is None:
raise
required: bool = (
only_require is True
or (
only_require is not True
and
fqme in only_require
)
)
# XXX: caller is allowed to provide a fallback
# mktmap table for the case where a new position is
# being added and the preloaded symcache didn't
# have this entry prior (eg. with frickin IB..)
mkt = _mktmap_table[fqme]
if (
not (mkt := _mktmap_table.get(fqme))
and
required
):
raise
elif not required:
continue
else:
# should be an entry retreived somewhere
assert mkt
if not (pos := pps.get(bs_mktid)):
@ -656,7 +691,7 @@ class Account(Struct):
def write_config(self) -> None:
'''
Write the current account state to the user's account TOML file, normally
something like ``pps.toml``.
something like `pps.toml`.
'''
# TODO: show diff output?

View File

@ -50,7 +50,7 @@ __brokers__: list[str] = [
'binance',
'ib',
'kraken',
'kucoin'
'kucoin',
# broken but used to work
# 'questrade',
@ -71,7 +71,7 @@ def get_brokermod(brokername: str) -> ModuleType:
Return the imported broker module by name.
'''
module = import_module('.' + brokername, 'piker.brokers')
module: ModuleType = import_module('.' + brokername, 'piker.brokers')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
from types import ModuleType
from typing import (
TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**tractor_kwargs,
) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
from piker.service import Services
from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
mngr: ServiceMngr = get_service_mngr()
ctx: tractor.Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername,
loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs
)
# NOTE: the service mngr expects an already spawned actor + its
# portal ref in order to do non-blocking setup of brokerd
# service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
)
return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_name=f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,

View File

@ -18,10 +18,11 @@
Handy cross-broker utils.
"""
from __future__ import annotations
from functools import partial
import json
import asks
import httpx
import logging
from ..log import (
@ -60,11 +61,11 @@ class NoData(BrokerError):
def __init__(
self,
*args,
info: dict,
info: dict|None = None,
) -> None:
super().__init__(*args)
self.info: dict = info
self.info: dict|None = info
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
@ -90,16 +91,18 @@ class DataThrottle(BrokerError):
def resproc(
resp: asks.response_objects.Response,
resp: httpx.Response,
log: logging.Logger,
return_json: bool = True,
log_resp: bool = False,
) -> asks.response_objects.Response:
"""Process response and return its json content.
) -> httpx.Response:
'''
Process response and return its json content.
Raise the appropriate error on non-200 OK responses.
"""
'''
if not resp.status_code == 200:
raise BrokerError(resp.body)
try:

View File

@ -1,8 +1,8 @@
# piker: trading gear for hackers
# Copyright (C)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -25,14 +25,13 @@ from __future__ import annotations
from collections import ChainMap
from contextlib import (
asynccontextmanager as acm,
AsyncExitStack,
)
from datetime import datetime
from pprint import pformat
from typing import (
Any,
Callable,
Hashable,
Sequence,
Type,
)
import hmac
@ -43,8 +42,7 @@ import trio
from pendulum import (
now,
)
import asks
from rapidfuzz import process as fuzzy
import httpx
import numpy as np
from piker import config
@ -54,6 +52,7 @@ from piker.clearing._messages import (
from piker.accounting import (
Asset,
digits_to_dec,
MktPair,
)
from piker.types import Struct
from piker.data import (
@ -69,7 +68,6 @@ from .venues import (
PAIRTYPES,
Pair,
MarketType,
_spot_url,
_futes_url,
_testnet_futes_url,
@ -79,19 +77,18 @@ from .venues import (
log = get_logger('piker.brokers.binance')
def get_config() -> dict:
def get_config() -> dict[str, Any]:
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section = conf.get('binance')
section: dict = conf.get('binance')
if not section:
log.warning(f'No config section found for binance in {path}')
log.warning(
f'No config section found for binance in {path}'
)
return {}
return section
@ -147,7 +144,7 @@ def binance_timestamp(
class Client:
'''
Async ReST API client using ``trio`` + ``asks`` B)
Async ReST API client using `trio` + `httpx` B)
Supports all of the spot, margin and futures endpoints depending
on method.
@ -156,10 +153,17 @@ class Client:
def __init__(
self,
venue_sessions: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
],
conf: dict[str, Any],
# TODO: change this to `Client.[mkt_]venue: MarketType`?
mkt_mode: MarketType = 'spot',
) -> None:
self.conf = conf
# build out pair info tables for each market type
# and wrap in a chain-map view for search / query.
self._spot_pairs: dict[str, Pair] = {} # spot info table
@ -186,44 +190,13 @@ class Client:
# market symbols for use by search. See `.exch_info()`.
self._pairs: ChainMap[str, Pair] = ChainMap()
# spot EPs sesh
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _spot_url
# spot testnet
self._test_sesh: asks.Session = asks.Session(connections=4)
self._test_sesh.base_location: str = _testnet_spot_url
# margin and extended spot endpoints session.
self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location: str = _spot_url
# futes EPs sesh
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location: str = _futes_url
# futes testnet
self._test_fapi_sesh: asks.Session = asks.Session(connections=4)
self._test_fapi_sesh.base_location: str = _testnet_futes_url
# global client "venue selection" mode.
# set this when you want to switch venues and not have to
# specify the venue for the next request.
self.mkt_mode: MarketType = mkt_mode
# per 8
self.venue_sesh: dict[
str, # venue key
tuple[asks.Session, str] # session, eps path
] = {
'spot': (self._sesh, '/api/v3/'),
'spot_testnet': (self._test_sesh, '/fapi/v1/'),
'margin': (self._sapi_sesh, '/sapi/v1/'),
'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'),
'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'),
# 'futes_coin': self._dapi, # TODO
}
# per-mkt-venue API client table
self.venue_sesh = venue_sessions
# lookup for going from `.mkt_mode: str` to the config
# subsection `key: str`
@ -238,40 +211,6 @@ class Client:
'futes': ['usdtm_futes'],
}
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
self.conf: dict = get_config()
for key, subconf in self.conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = self.confkey2venuekeys[key]
venue_key: str
sesh: asks.Session
for venue_key in venue_keys:
sesh, _ = self.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
sesh.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = self.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
def _mk_sig(
self,
data: dict,
@ -290,7 +229,6 @@ class Client:
'to define the creds for auth-ed endpoints!?'
)
# XXX: Info on security and authentification
# https://binance-docs.github.io/apidocs/#endpoint-security-type
if not (api_secret := subconf.get('api_secret')):
@ -319,7 +257,7 @@ class Client:
params: dict,
method: str = 'get',
venue: str | None = None, # if None use `.mkt_mode` state
venue: str|None = None, # if None use `.mkt_mode` state
signed: bool = False,
allow_testnet: bool = False,
@ -330,8 +268,9 @@ class Client:
- /fapi/v3/ USD-M FUTURES, or
- /api/v3/ SPOT/MARGIN
account/market endpoint request depending on either passed in `venue: str`
or the current setting `.mkt_mode: str` setting, default `'spot'`.
account/market endpoint request depending on either passed in
`venue: str` or the current setting `.mkt_mode: str` setting,
default `'spot'`.
Docs per venue API:
@ -360,9 +299,6 @@ class Client:
venue=venue_key,
)
sesh: asks.Session
path: str
# Check if we're configured to route order requests to the
# venue equivalent's testnet.
use_testnet: bool = False
@ -387,11 +323,12 @@ class Client:
# ctl machinery B)
venue_key += '_testnet'
sesh, path = self.venue_sesh[venue_key]
meth: Callable = getattr(sesh, method)
client: httpx.AsyncClient
path: str
client, path = self.venue_sesh[venue_key]
meth: Callable = getattr(client, method)
resp = await meth(
path=path + endpoint,
url=path + endpoint,
params=params,
timeout=float('inf'),
)
@ -433,7 +370,15 @@ class Client:
item['filters'] = filters
pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**item)
try:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
)
raise
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
@ -528,7 +473,9 @@ class Client:
'''
pair_table: dict[str, Pair] = self._venue2pairs[
venue or self.mkt_mode
venue
or
self.mkt_mode
]
if (
expiry
@ -547,9 +494,9 @@ class Client:
venues: list[str] = [venue]
# batch per-venue download of all exchange infos
async with trio.open_nursery() as rn:
async with trio.open_nursery() as tn:
for ven in venues:
rn.start_soon(
tn.start_soon(
self._cache_pairs,
ven,
)
@ -602,11 +549,11 @@ class Client:
) -> dict[str, Any]:
fq_pairs: dict = await self.exch_info()
fq_pairs: dict[str, Pair] = await self.exch_info()
# TODO: cache this list like we were in
# `open_symbol_search()`?
keys: list[str] = list(fq_pairs)
# keys: list[str] = list(fq_pairs)
return match_from_pairs(
pairs=fq_pairs,
@ -614,9 +561,20 @@ class Client:
score_cutoff=50,
)
def pair2venuekey(
self,
pair: Pair,
) -> str:
return {
'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]
async def bars(
self,
symbol: str,
mkt: MktPair,
start_dt: datetime | None = None,
end_dt: datetime | None = None,
@ -646,16 +604,20 @@ class Client:
start_time = binance_timestamp(start_dt)
end_time = binance_timestamp(end_dt)
bs_pair: Pair = self._pairs[mkt.bs_fqme.upper()]
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api(
'klines',
params={
'symbol': symbol.upper(),
# NOTE: always query using their native symbology!
'symbol': mkt.bs_mktid.upper(),
'interval': '1m',
'startTime': start_time,
'endTime': end_time,
'limit': limit
},
venue=self.pair2venuekey(bs_pair),
allow_testnet=False,
)
new_bars: list[tuple] = []
@ -972,17 +934,148 @@ class Client:
await self.close_listen_key(key)
_venue_urls: dict[str, str] = {
'spot': (
_spot_url,
'/api/v3/',
),
'spot_testnet': (
_testnet_spot_url,
'/fapi/v1/'
),
# margin and extended spot endpoints session.
# TODO: did this ever get implemented fully?
# 'margin': (
# _spot_url,
# '/sapi/v1/'
# ),
'usdtm_futes': (
_futes_url,
'/fapi/v1/',
),
'usdtm_futes_testnet': (
_testnet_futes_url,
'/fapi/v1/',
),
# TODO: for anyone who actually needs it ;P
# 'coin_futes': ()
}
def init_api_keys(
client: Client,
conf: dict[str, Any],
) -> None:
'''
Set up per-venue API keys each http client according to the user's
`brokers.conf`.
For ex, to use spot-testnet and live usdt futures APIs:
```toml
[binance]
# spot test net
spot.use_testnet = true
spot.api_key = '<spot_api_key_from_binance_account>'
spot.api_secret = '<spot_api_key_password>'
# futes live
futes.use_testnet = false
accounts.usdtm = 'futes'
futes.api_key = '<futes_api_key_from_binance>'
futes.api_secret = '<futes_api_key_password>''
# if uncommented will use the built-in paper engine and not
# connect to `binance` API servers for order ctl.
# accounts.paper = 'paper'
```
'''
for key, subconf in conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = client.confkey2venuekeys[key]
venue_key: str
client: httpx.AsyncClient
for venue_key in venue_keys:
client, _ = client.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
client.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = client.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
@acm
async def get_client() -> Client:
async def get_client(
mkt_mode: MarketType = 'spot',
) -> Client:
'''
Construct an single `piker` client which composes multiple underlying venue
specific API clients both for live and test networks.
client = Client()
await client.exch_info()
log.info(
f'{client} in {client.mkt_mode} mode: caching exchange infos..\n'
'Cached multi-market pairs:\n'
f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n'
f'Total: {len(client._pairs)}\n'
)
'''
venue_sessions: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
] = {}
async with AsyncExitStack() as client_stack:
for name, (base_url, path) in _venue_urls.items():
api: httpx.AsyncClient = await client_stack.enter_async_context(
httpx.AsyncClient(
base_url=base_url,
# headers={},
yield client
# TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
)
)
venue_sessions[name] = (
api,
path,
)
conf: dict[str, Any] = get_config()
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
client = Client(
venue_sessions=venue_sessions,
conf=conf,
mkt_mode=mkt_mode,
)
init_api_keys(
client=client,
conf=conf,
)
fq_pairs: dict[str, Pair] = await client.exch_info()
assert fq_pairs
log.info(
f'Loaded multi-venue `Client` in mkt_mode={client.mkt_mode!r}\n\n'
f'Symbology Summary:\n'
f'------ - ------\n'
f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n'
'------ - ------\n'
f'total: {len(client._pairs)}\n'
)
yield client

View File

@ -264,15 +264,20 @@ async def open_trade_dialog(
# do a open_symcache() call.. though maybe we can hide
# this in a new async version of open_account()?
async with open_cached_client('binance') as client:
subconf: dict = client.conf[venue_name]
use_testnet = subconf.get('use_testnet', False)
subconf: dict|None = client.conf.get(venue_name)
# XXX: if no futes.api_key or spot.api_key has been set we
# always fall back to the paper engine!
if not subconf.get('api_key'):
if (
not subconf
or
not subconf.get('api_key')
):
await ctx.started('paper')
return
use_testnet: bool = subconf.get('use_testnet', False)
async with (
open_cached_client('binance') as client,
):

View File

@ -48,6 +48,7 @@ import tractor
from piker.brokers import (
open_cached_client,
NoData,
)
from piker._cacheables import (
async_lifo_cache,
@ -252,24 +253,30 @@ async def open_history_client(
else:
client.mkt_mode = 'spot'
# NOTE: always query using their native symbology!
mktid: str = mkt.bs_mktid
array = await client.bars(
mktid,
array: np.ndarray = await client.bars(
mkt=mkt,
start_dt=start_dt,
end_dt=end_dt,
)
if array.size == 0:
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
times = array['time']
if (
end_dt is None
):
inow = round(time.time())
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}

View File

@ -137,10 +137,12 @@ class SpotPair(Pair, frozen=True):
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool
otoAllowed: bool
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
permissions: list[str]
permissionSets: list[list[str]]
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair'
@ -179,7 +181,6 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT',
quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'],

View File

@ -100,7 +100,7 @@ async def data_reset_hack(
log.warning(
no_setup_msg
+
f'REQUIRES A `vnc_addrs: array` ENTRY'
'REQUIRES A `vnc_addrs: array` ENTRY'
)
vnc_host, vnc_port = vnc_sockaddr.get(
@ -259,7 +259,7 @@ def i3ipc_xdotool_manual_click_hack() -> None:
timeout=timeout,
)
# re-activate and focus original window
# re-activate and focus original window
subprocess.call([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),

View File

@ -287,9 +287,31 @@ class Client:
self.conf = config
# NOTE: the ib.client here is "throttled" to 45 rps by default
self.ib = ib
self.ib: IB = ib
self.ib.RaiseRequestErrors: bool = True
# self._acnt_names: set[str] = {}
self._acnt_names: list[str] = []
@property
def acnts(self) -> list[str]:
# return list(self._acnt_names)
return self._acnt_names
def __repr__(self) -> str:
return (
f'<{type(self).__name__}('
f'ib={self.ib} '
f'acnts={self.acnts}'
# TODO: we need to mask out acnt-#s and other private
# infos if we're going to console this!
# f' |_.conf:\n'
# f' {pformat(self.conf)}\n'
')>'
)
async def get_fills(self) -> list[Fill]:
'''
Return list of rents `Fills` from trading session.
@ -376,55 +398,63 @@ class Client:
# whatToShow='MIDPOINT',
# whatToShow='TRADES',
)
log.info(
f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n'
f'fqme: {fqme}\n'
f'global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
bars = await self.ib.reqHistoricalDataAsync(
**kwargs,
)
query_info: str = (
f'REQUESTING IB history BARS\n'
f' ------ - ------\n'
f'dt_duration: {dt_duration}\n'
f'ib_duration_str: {ib_duration_str}\n'
f'bar_size: {bar_size}\n'
f'fqme: {fqme}\n'
f'actor-global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
# tail case if no history for range or none prior.
# NOTE: there's actually 3 cases here to handle (and
# this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`):
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
if not bars:
# NOTE: there's actually 3 cases here to handle (and
# this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`):
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
# TODO: figure out wut's going on here.
# TODO: is this handy, a sync requester for tinkering
# with empty frame cases?
# def get_hist():
# return self.ib.reqHistoricalData(**kwargs)
# import pdbp
# pdbp.set_trace()
# sync requester for debugging empty frame cases
def get_hist():
return self.ib.reqHistoricalData(**kwargs)
log.critical(
'STUPID IB SAYS NO HISTORY\n\n'
+ query_info
)
assert get_hist
import pdbp
pdbp.set_trace()
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
# rewrite the method in the first case?
# right now there's no way to detect a timeout..
return [], np.empty(0), dt_duration
# NOTE XXX: ensure minimum duration in bars B)
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
log.info(query_info)
# NOTE XXX: ensure minimum duration in bars?
# => recursively call this method until we get at least as
# many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
# - if you query over a gap and get no data
# that may short circuit the history
if (
end_dt
and False
# XXX XXX XXX
# => WHY DID WE EVEN NEED THIS ORIGINALLY!? <=
# XXX XXX XXX
False
and end_dt
):
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
@ -927,7 +957,10 @@ class Client:
warnset = True
else:
log.info(f'Got first quote for {contract}')
log.info(
'Got first quote for contract\n'
f'{contract}\n'
)
break
else:
if timeouterr and raise_on_timeout:
@ -991,8 +1024,12 @@ class Client:
outsideRth=True,
optOutSmartRouting=True,
# TODO: need to understand this setting better as
# it pertains to shit ass mms..
routeMarketableToBbo=True,
designatedLocation='SMART',
# TODO: make all orders GTC?
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba
# goodTillDate=f"yyyyMMdd-HH:mm:ss",
@ -1120,8 +1157,8 @@ def get_config() -> dict[str, Any]:
names = list(accounts.keys())
accts = section['accounts'] = bidict(accounts)
log.info(
f'brokers.toml defines {len(accts)} accounts: '
f'{pformat(names)}'
f'{path} defines {len(accts)} account aliases:\n'
f'{pformat(names)}\n'
)
if section is None:
@ -1188,7 +1225,7 @@ async def load_aio_clients(
try_ports = list(try_ports.values())
_err = None
accounts_def = config.load_accounts(['ib'])
accounts_def: dict[str, str] = config.load_accounts(['ib'])
ports = try_ports if port is None else [port]
combos = list(itertools.product(hosts, ports))
accounts_found: dict[str, Client] = {}
@ -1213,6 +1250,12 @@ async def load_aio_clients(
for i in range(connect_retries):
try:
log.info(
'Trying `ib_async` connect\n'
f'{host}: {port}\n'
f'clientId: {client_id}\n'
f'timeout: {connect_timeout}\n'
)
await ib.connectAsync(
host,
port,
@ -1227,7 +1270,9 @@ async def load_aio_clients(
client = Client(ib=ib, config=conf)
# update all actor-global caches
log.info(f"Caching client for {sockaddr}")
log.runtime(
f'Connected and caching `Client` @ {sockaddr!r}'
)
_client_cache[sockaddr] = client
break
@ -1242,37 +1287,59 @@ async def load_aio_clients(
OSError,
) as ce:
_err = ce
log.warning(
f'Failed to connect on {host}:{port} for {i} time with,\n'
f'{ib.client.apiError.value()}\n'
'retrying with a new client id..')
message: str = (
f'Failed to connect on {host}:{port} after {i} tries with\n'
f'{ib.client.apiError.value()!r}\n\n'
'Retrying with a new client id..\n'
)
log.runtime(message)
else:
# XXX report loudly if we never established after all
# re-tries
log.warning(message)
# Pre-collect all accounts available for this
# connection and map account names to this client
# instance.
for value in ib.accountValues():
acct_number = value.account
acct_number: str = value.account
entry = accounts_def.inverse.get(acct_number)
if not entry:
acnt_alias: str = accounts_def.inverse.get(acct_number)
if not acnt_alias:
# TODO: should we constuct the below reco-ex from
# the existing config content?
_, path = config.load(
conf_name='brokers',
)
raise ValueError(
'No section in brokers.toml for account:'
f' {acct_number}\n'
f'Please add entry to continue using this API client'
'No alias in account section for account!\n'
f'Please add an acnt alias entry to your {path}\n'
'For example,\n\n'
'[ib.accounts]\n'
'margin = {accnt_number!r}\n'
'^^^^^^ <- you need this part!\n\n'
'This ensures `piker` will not leak private acnt info '
'to console output by default!\n'
)
# surjection of account names to operating clients.
if acct_number not in accounts_found:
accounts_found[entry] = client
if acnt_alias not in accounts_found:
accounts_found[acnt_alias] = client
# client._acnt_names.add(acnt_alias)
client._acnt_names.append(acnt_alias)
log.info(
f'Loaded accounts for client @ {host}:{port}\n'
f'{pformat(accounts_found)}'
)
if accounts_found:
log.info(
f'Loaded accounts for api client\n\n'
f'{pformat(accounts_found)}\n'
)
# XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found)
# XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found)
# if we have no clients after the scan loop then error out.
if not _client_cache:
@ -1306,7 +1373,9 @@ async def load_clients_for_trio(
a ``tractor.to_asyncio.open_channel_from()``.
'''
async with load_aio_clients() as accts2clients:
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
to_trio.send_nowait(accts2clients)
@ -1472,7 +1541,7 @@ async def open_aio_client_method_relay(
msg: tuple[str, dict] | dict | None = await from_trio.get()
match msg:
case None: # termination sentinel
print('asyncio PROXY-RELAY SHUTDOWN')
log.info('asyncio `Client` method-proxy SHUTDOWN!')
break
case (meth_name, kwargs):

View File

@ -1183,7 +1183,14 @@ async def deliver_trade_events(
pos
and fill
):
assert fill.commissionReport == cr
now_cr: CommissionReport = fill.commissionReport
if (now_cr != cr):
log.warning(
'UhhHh ib updated the commission report mid-fill..?\n'
f'was: {pformat(cr)}\n'
f'now: {pformat(now_cr)}\n'
)
await emit_pp_update(
ems_stream,
accounts_def,

View File

@ -671,8 +671,8 @@ async def _setup_quote_stream(
# making them mostly useless and explains why the scanner
# is always slow XD
# '293', # Trade count for day
'294', # Trade rate / minute
'295', # Vlm rate / minute
# '294', # Trade rate / minute
# '295', # Vlm rate / minute
),
contract: Contract | None = None,
@ -915,9 +915,13 @@ async def stream_quotes(
if first_ticker:
first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n\n'
f'{pformat(first_quote)}\n'
)
# NOTE: it might be outside regular trading hours for
@ -969,7 +973,11 @@ async def stream_quotes(
raise_on_timeout=True,
)
first_quote: dict = normalize(first_ticker)
log.info(
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)

View File

@ -31,7 +31,11 @@ from typing import (
)
from bidict import bidict
import pendulum
from pendulum import (
DateTime,
parse,
from_timestamp,
)
from ib_insync import (
Contract,
Commodity,
@ -66,10 +70,11 @@ tx_sort: Callable = partial(
iter_by_dt,
parsers={
'dateTime': parse_flex_dt,
'datetime': pendulum.parse,
# for some some fucking 2022 and
# back options records...fuck me.
'date': pendulum.parse,
'datetime': parse,
# XXX: for some some fucking 2022 and
# back options records.. f@#$ me..
'date': parse,
}
)
@ -89,15 +94,38 @@ def norm_trade(
conid: int = str(record.get('conId') or record['conid'])
bs_mktid: str = str(conid)
comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
price = record.get('price') or record['tradePrice']
# NOTE: sometimes weird records (like BTTX?)
# have no field for this?
comms: float = -1 * (
record.get('commission')
or record.get('ibCommission')
or 0
)
if not comms:
log.warning(
'No commissions found for record?\n'
f'{pformat(record)}\n'
)
price: float = (
record.get('price')
or record.get('tradePrice')
)
if price is None:
log.warning(
'No `price` field found in record?\n'
'Skipping normalization..\n'
f'{pformat(record)}\n'
)
return None
# the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!?
size = record.get('quantity') or record['shares'] * {
size: float|int = (
record.get('quantity')
or record['shares']
) * {
'BOT': 1,
'SLD': -1,
}[record['side']]
@ -128,26 +156,31 @@ def norm_trade(
# otype = tail[6]
# strike = tail[7:]
print(f'skipping opts contract {symbol}')
log.warning(
f'Skipping option contract -> NO SUPPORT YET!\n'
f'{symbol}\n'
)
return None
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
flex_dtstr = record.get('dateTime')
dtstr: str = record.get('datetime')
date: str = record.get('date')
flex_dtstr: str = record.get('dateTime')
if dtstr or date:
dt = pendulum.parse(dtstr or date)
dt: DateTime = parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
dt: DateTime = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
asset_type: str = record.get(
'assetCategory'
) or record.get('secType', 'STK')
asset_type: str = (
record.get('assetCategory')
or record.get('secType')
or 'STK'
)
if (expiry := (
record.get('lastTradeDateOrContractMonth')
@ -357,6 +390,7 @@ def norm_trade_records(
if txn is None:
continue
# inject txns sorted by datetime
insort(
records,
txn,
@ -405,7 +439,7 @@ def api_trades_to_ledger_entries(
txn_dict[attr_name] = val
tid = str(txn_dict['execId'])
dt = pendulum.from_timestamp(txn_dict['time'])
dt = from_timestamp(txn_dict['time'])
txn_dict['datetime'] = str(dt)
acctid = accounts[txn_dict['acctNumber']]

View File

@ -209,7 +209,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
break
ib_client = proxy._aio_ns.ib
log.info(f'Using {ib_client} for symbol search')
log.info(
f'Using API client for symbol-search\n'
f'{ib_client}\n'
)
last = time.time()
async for pattern in stream:
@ -294,7 +297,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
elif stock_results:
break
# else:
await tractor.pause()
# await tractor.pause()
# # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract(
@ -522,7 +525,21 @@ async def get_mkt_info(
venue = con.primaryExchange or con.exchange
price_tick: Decimal = Decimal(str(details.minTick))
# price_tick: Decimal = Decimal('0.01')
ib_min_tick_gt_2: Decimal = Decimal('0.01')
if (
price_tick < ib_min_tick_gt_2
):
# TODO: we need to add some kinda dynamic rounding sys
# to our MktPair i guess?
# not sure where the logic should sit, but likely inside
# the `.clearing._ems` i suppose...
log.warning(
'IB seems to disallow a min price tick < 0.01 '
'when the price is > 2.0..?\n'
f'Decreasing min tick precision for {fqme} to 0.01'
)
# price_tick = ib_min_tick
# await tractor.pause()
if atype == 'stock':
# XXX: GRRRR they don't support fractional share sizes for

View File

@ -27,8 +27,8 @@ from typing import (
)
import time
import httpx
import pendulum
import asks
import numpy as np
import urllib.parse
import hashlib
@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken')
# <uri>/<version>/
_url = 'https://api.kraken.com/0'
_headers: dict[str, str] = {
'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}
# TODO: this is the only backend providing this right?
# in which case we should drop it from the defaults and
# instead make a custom fields descr in this module!
@ -135,16 +140,15 @@ class Client:
def __init__(
self,
config: dict[str, str],
httpx_client: httpx.AsyncClient,
name: str = '',
api_key: str = '',
secret: str = ''
) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._sesh: httpx.AsyncClient = httpx_client
self._name = name
self._api_key = api_key
self._secret = secret
@ -166,10 +170,9 @@ class Client:
method: str,
data: dict,
) -> dict[str, Any]:
resp = await self._sesh.post(
path=f'/public/{method}',
resp: httpx.Response = await self._sesh.post(
url=f'/public/{method}',
json=data,
timeout=float('inf')
)
return resproc(resp, log)
@ -180,18 +183,18 @@ class Client:
uri_path: str
) -> dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
'Content-Type': 'application/x-www-form-urlencoded',
'API-Key': self._api_key,
'API-Sign': get_kraken_signature(
uri_path,
data,
self._secret,
),
}
resp = await self._sesh.post(
path=f'/private/{method}',
resp: httpx.Response = await self._sesh.post(
url=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
@ -665,24 +668,36 @@ class Client:
@acm
async def get_client() -> Client:
conf = get_config()
if conf:
client = Client(
conf,
conf: dict[str, Any] = get_config()
async with httpx.AsyncClient(
base_url=_url,
headers=_headers,
# TODO: don't break these up and just do internal
# conf lookups instead..
name=conf['key_descr'],
api_key=conf['api_key'],
secret=conf['secret']
)
else:
client = Client({})
# TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
) as trio_client:
if conf:
client = Client(
conf,
httpx_client=trio_client,
# at startup, load all symbols, and asset info in
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets)
await client.get_mkt_pairs()
# TODO: don't break these up and just do internal
# conf lookups instead..
name=conf['key_descr'],
api_key=conf['api_key'],
secret=conf['secret']
)
else:
client = Client(
conf={},
httpx_client=trio_client,
)
yield client
# at startup, load all symbols, and asset info in
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets)
await client.get_mkt_pairs()
yield client

View File

@ -612,18 +612,18 @@ async def open_trade_dialog(
# enter relay loop
await handle_order_updates(
client,
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
acnt,
api_trans,
acctid,
acc_name,
token,
client=client,
ws=ws,
ws_stream=stream,
ems_stream=ems_stream,
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
acnt=acnt,
ledger=ledger,
acctid=acctid,
acc_name=acc_name,
token=token,
)
@ -639,7 +639,8 @@ async def handle_order_updates(
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
ledger_trans: dict[str, Transaction],
ledger: TransactionLedger,
# ledger_trans: dict[str, Transaction],
acctid: str,
acc_name: str,
token: str,
@ -699,7 +700,8 @@ async def handle_order_updates(
# if tid not in ledger_trans
}
for tid, trade in trades.items():
assert tid not in ledger_trans
# assert tid not in ledger_trans
assert tid not in ledger
txid = trade['ordertxid']
reqid = trade.get('userref')
@ -747,11 +749,17 @@ async def handle_order_updates(
client,
api_name_set='wsname',
)
ppmsgs = trades2pps(
acnt,
acctid,
new_trans,
ppmsgs: list[BrokerdPosition] = trades2pps(
acnt=acnt,
ledger=ledger,
acctid=acctid,
new_trans=new_trans,
)
# ppmsgs = trades2pps(
# acnt,
# acctid,
# new_trans,
# )
for pp_msg in ppmsgs:
await ems_stream.send(pp_msg)

View File

@ -16,10 +16,9 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Kucoin broker backend
Kucoin cex API backend.
'''
from contextlib import (
asynccontextmanager as acm,
aclosing,
@ -42,7 +41,7 @@ import wsproto
from uuid import uuid4
from trio_typing import TaskStatus
import asks
import httpx
from bidict import bidict
import numpy as np
import pendulum
@ -112,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True):
quoteMaxSize: float
quoteMinSize: float
symbol: str # our bs_mktid, kucoin's internal id
feeCategory: int
makerFeeCoefficient: float
takerFeeCoefficient: float
st: bool
class AccountTrade(Struct, frozen=True):
@ -212,8 +215,12 @@ def get_config() -> BrokerConfig | None:
class Client:
def __init__(self) -> None:
self._config: BrokerConfig | None = get_config()
def __init__(
self,
httpx_client: httpx.AsyncClient,
) -> None:
self._http: httpx.AsyncClient = httpx_client
self._config: BrokerConfig|None = get_config()
self._pairs: dict[str, KucoinMktPair] = {}
self._fqmes2mktids: bidict[str, str] = bidict()
self._bars: list[list[float]] = []
@ -227,18 +234,24 @@ class Client:
) -> dict[str, str | bytes]:
'''
Generate authenticated request headers
Generate authenticated request headers:
https://docs.kucoin.com/#authentication
https://www.kucoin.com/docs/basic-info/connection-method/authentication/creating-a-request
https://www.kucoin.com/docs/basic-info/connection-method/authentication/signing-a-message
'''
if not self._config:
raise ValueError(
'No config found when trying to send authenticated request')
'No config found when trying to send authenticated request'
)
str_to_sign = (
str(int(time.time() * 1000))
+ action + f'/api/{api}/{endpoint.lstrip("/")}'
+
action
+
f'/api/{api}/{endpoint.lstrip("/")}'
)
signature = base64.b64encode(
@ -249,6 +262,7 @@ class Client:
).digest()
)
# TODO: can we cache this between calls?
passphrase = base64.b64encode(
hmac.new(
self._config.key_secret.encode('utf-8'),
@ -270,8 +284,10 @@ class Client:
self,
action: Literal['POST', 'GET'],
endpoint: str,
api: str = 'v2',
headers: dict = {},
) -> Any:
'''
Generic request wrapper for Kucoin API
@ -284,14 +300,19 @@ class Client:
api,
)
api_url = f'https://api.kucoin.com/api/{api}/{endpoint}'
res = await asks.request(action, api_url, headers=headers)
json = res.json()
if 'data' in json:
return json['data']
req_meth: Callable = getattr(
self._http,
action.lower(),
)
res = await req_meth(
url=f'/{api}/{endpoint}',
headers=headers,
)
json: dict = res.json()
if (data := json.get('data')) is not None:
return data
else:
api_url: str = self._http.base_url
log.error(
f'Error making request to {api_url} ->\n'
f'{pformat(res)}'
@ -311,7 +332,7 @@ class Client:
'''
token_type = 'private' if private else 'public'
try:
data: dict[str, Any] | None = await self._request(
data: dict[str, Any]|None = await self._request(
'POST',
endpoint=f'bullet-{token_type}',
api='v1'
@ -349,8 +370,8 @@ class Client:
currencies: dict[str, Currency] = {}
entries: list[dict] = await self._request(
'GET',
api='v1',
endpoint='currencies',
api='v1',
)
for entry in entries:
curr = Currency(**entry).copy()
@ -366,7 +387,10 @@ class Client:
dict[str, KucoinMktPair],
bidict[str, KucoinMktPair],
]:
entries = await self._request('GET', 'symbols')
entries = await self._request(
'GET',
endpoint='symbols',
)
log.info(f' {len(entries)} Kucoin market pairs fetched')
pairs: dict[str, KucoinMktPair] = {}
@ -567,13 +591,21 @@ def fqme_to_kucoin_sym(
@acm
async def get_client() -> AsyncGenerator[Client, None]:
client = Client()
'''
Load an API `Client` preconfigured from user settings
async with trio.open_nursery() as n:
n.start_soon(client.get_mkt_pairs)
await client.get_currencies()
'''
async with (
httpx.AsyncClient(
base_url='https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
async with trio.open_nursery() as tn:
tn.start_soon(client.get_mkt_pairs)
await client.get_currencies()
yield client
yield client
@tractor.context
@ -609,7 +641,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info('Starting ping task for kucoin ws connection')
log.warning('Starting ping task for kucoin ws connection')
n.start_soon(ping_server)
yield
@ -621,9 +653,14 @@ async def open_ping_task(
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, KucoinMktPair]:
) -> tuple[
MktPair,
KucoinMktPair,
]:
'''
Query for and return a `MktPair` and `KucoinMktPair`.
Query for and return both a `piker.accounting.MktPair` and
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
'''
async with open_cached_client('kucoin') as client:
@ -698,6 +735,8 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str)
init_msgs.append(
FeedInit(mkt_info=mkt)
@ -705,7 +744,11 @@ async def stream_quotes(
ws: NoBsWs
token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4())
log.info('API reported ping_interval: {ping_interval}\n')
connect_id: str = str(uuid4())
typ: str
quote: dict
async with (
open_autorecon_ws(
(
@ -719,20 +762,37 @@ async def stream_quotes(
),
) as ws,
open_ping_task(ws, ping_interval, connect_id),
aclosing(stream_messages(ws, sym_str)) as msg_gen,
aclosing(
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
):
typ, quote = await anext(msg_gen)
typ, quote = await anext(iter_quotes)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real
# trade quote?
# ^TODO, remove this right?
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym_str: msg})
# XXX NOTE, DO NOT include the `.<backend>` suffix!
# OW the sampling loop will not broadcast correctly..
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm
@ -787,7 +847,7 @@ async def subscribe(
)
async def stream_messages(
async def iter_normed_quotes(
ws: NoBsWs,
sym: str,
@ -818,6 +878,9 @@ async def stream_messages(
yield 'trade', {
'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price,
'brokerd_ts': last_trade_ts,
'ticks': [
@ -910,7 +973,7 @@ async def open_history_client(
if end_dt is None:
inow = round(time.time())
print(
log.debug(
f'difference in time between load and processing'
f'{inow - times[-1]}'
)

View File

@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
for client_stream in subs.copy():
try:
await client_stream.send(msg)
sent_some = True
@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')

View File

@ -653,6 +653,7 @@ async def open_trade_dialog(
# in) use manually constructed table from calling
# the `.get_mkt_info()` provider EP above.
_mktmap_table=mkt_by_fqme,
only_require=list(mkt_by_fqme),
)
pp_msgs: list[BrokerdPosition] = []

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
tractor.get_registry(
host=host,
port=ports[0]
) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict,
)
from contextlib import asynccontextmanager as acm
from functools import partial
import time
from typing import (
Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery,
)
import trio
from trio_typing import TaskStatus
from trio import TaskStatus
from .ticktools import (
frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler:
'''
Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
time-step-sample a (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
'''
service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream:
try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str | None = None,
**extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting.
'''
from piker.service import Services
from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd'
log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']:
mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks:
portal = await Services.actor_n.start_actor(
dname,
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
)
return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception(f'Retrying connection')
log.exception('Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs.
'''
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
# request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable = count(start_id)
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
@ -394,26 +398,40 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': next(rpc_id),
'id': req_id,
'method': method,
'params': params
}
_id = msg['id']
rpc_results[_id] = {
result = rpc_results[_id] = {
'result': None,
'event': trio.Event()
'error': None,
'event': trio.Event(), # signal caller resp arrived
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result']
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
# if request_hook:
# await request_hook(request_type(**msg))
case {
'error': error
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
# if error_hook:
# await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere?
'''
from ._mngr import Services as Services
from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations
import os
from typing import (
Optional,
Any,
ClassVar,
)
@ -30,13 +29,13 @@ from contextlib import (
)
import tractor
import trio
from ._util import (
get_console_log,
)
from ._mngr import (
Services,
open_service_mngr,
ServiceMngr,
)
from ._registry import ( # noqa
_tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [],
loglevel: Optional[str] = None,
loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None,
tractor_runtime_overrides: dict|None = None,
**tractor_kwargs,
) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs,
) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs,
) -> Services:
) -> ServiceMngr:
'''
Start a root piker daemon with an indefinite lifetime.
Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
A root actor-nursery is created which can be used to spawn and
supervise underling service sub-actors (see below).
'''
# NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor,
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?'
)
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
mngr: ServiceMngr
async with open_service_mngr(
debug_mode=debug_mode,
) as mngr:
yield mngr
# TODO: do we even need this?
# @acm
# async def maybe_open_runtime(
# loglevel: Optional[str] = None,
# loglevel: str|None = None,
# **kwargs,
# ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout,
)
from ._mngr import Services
from ._mngr import ServiceMngr
from ._util import (
log, # sub-sys logger
get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm
async def start_ahab_service(
services: Services,
services: ServiceMngr,
service_name: str,
# endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container')
except (
trio.MultiError,
# trio.MultiError,
ExceptionGroup,
) as err:
for subexc in err.exceptions:
if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import (
asynccontextmanager as acm,
)
from collections import defaultdict
import tractor
import trio
from ._util import (
log, # sub-sys logger
)
from ._mngr import (
Services,
get_service_mngr,
ServiceMngr,
)
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: str | None = None,
singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs,
) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
'''
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]
lock = _locks[service_name]
await lock.acquire()
async with find_service(
@ -102,6 +104,12 @@ async def maybe_spawn_daemon(
# service task for that actor.
started: bool
if pikerd_portal is None:
# await tractor.pause()
if tractor_kwargs.get('debug_mode', False):
from tractor.devx._debug import maybe_init_greenback
await maybe_init_greenback()
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
@ -132,7 +140,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
# --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd(
@ -147,26 +213,25 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
portal = await Services.actor_n.start_actor(
smngr: ServiceMngr = get_service_mngr()
portal = await smngr.an.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task(
'emsd',
portal,
# signature of target root-task endpoint
_setup_persistent_emsd,
await smngr.start_service_ctx(
name='emsd',
portal=portal,
ctx_fn=_setup_persistent_emsd,
loglevel=loglevel,
)
return True

View File

@ -18,148 +18,36 @@
daemon-service management API.
"""
from collections import defaultdict
from typing import (
Callable,
Any,
from contextlib import (
asynccontextmanager as acm,
)
import trio
from trio_typing import TaskStatus
import tractor
from tractor import (
current_actor,
ContextCancelled,
Context,
Portal,
from tractor.hilevel import (
ServiceMngr,
# open_service_mngr as _open_service_mngr,
get_service_mngr as get_service_mngr,
)
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
from ._util import (
log, # sub-sys logger
)
Services: ServiceMngr|None = None
@acm
async def open_service_mngr(
**kwargs,
) -> ServiceMngr:
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: Portal,
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, first):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'
global Services
async with tractor.hilevel.open_service_mngr(
**kwargs,
) as mngr:
# Services = proxy(mngr)
Services = mngr
yield mngr
Services = None

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING,
)
# TODO: oof, needs to be changed to `httpx`!
import asks
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger
from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm
async def start_ahab_daemon(
service_mngr: Services,
service_mngr: ServiceMngr,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc
from ..data.feed import maybe_open_feed
from . import Services
from . import ServiceMngr
from ._util import (
log, # sub-sys logger
get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm
async def start_ahab_daemon(
service_mngr: Services,
service_mngr: ServiceMngr,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -458,13 +458,15 @@ async def start_backfill(
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return
assert (
@ -578,6 +580,7 @@ async def start_backfill(
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.

View File

@ -10,7 +10,7 @@ from piker import (
config,
)
from piker.service import (
Services,
get_service_mngr,
)
from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager,
):
# this proc/actor is the pikerd
assert service_manager is Services
assert service_manager is get_service_mngr()
async with tractor.wait_for_actor(
'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor
from uuid import uuid4
from piker.service import Services
from piker.service import ServiceMngr
from piker.log import get_logger
from piker.clearing._messages import (
Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker(
open_test_pikerd: Services,
open_test_pikerd: ServiceMngr,
loglevel: str,
):
async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import (
find_service,
Services,
ServiceMngr,
)
from piker.data import (
open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main():
port = 6666
daemon_addr = ('127.0.0.1', port)
services: Services
services: ServiceMngr
async with (
open_test_pikerd(