Compare commits
	
		
			22 Commits 
		
	
	
		
			179a42ac78
			...
			e4a68bdec9
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						e4a68bdec9 | |
| 
							
							
								 | 
						8efbfb7038 | |
| 
							
							
								 | 
						98c068d757 | |
| 
							
							
								 | 
						363a7cf4e0 | |
| 
							
							
								 | 
						f524912882 | |
| 
							
							
								 | 
						2f2b87b3d3 | |
| 
							
							
								 | 
						513544f5f7 | |
| 
							
							
								 | 
						9dc8006f10 | |
| 
							
							
								 | 
						6f17ef0c34 | |
| 
							
							
								 | 
						877f334b85 | |
| 
							
							
								 | 
						0508d08c49 | |
| 
							
							
								 | 
						9c14b08b81 | |
| 
							
							
								 | 
						5ad8863e32 | |
| 
							
							
								 | 
						5fad27c30d | |
| 
							
							
								 | 
						eb1a0a685d | |
| 
							
							
								 | 
						5c3f2750c9 | |
| 
							
							
								 | 
						9b7b062dfd | |
| 
							
							
								 | 
						0538e420cb | |
| 
							
							
								 | 
						4848dc40cc | |
| 
							
							
								 | 
						407efe3b10 | |
| 
							
							
								 | 
						00108010c9 | |
| 
							
							
								 | 
						8a4901c517 | 
| 
						 | 
					@ -30,7 +30,8 @@ from types import ModuleType
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
    Any,
 | 
					    Any,
 | 
				
			||||||
    Iterator,
 | 
					    Iterator,
 | 
				
			||||||
    Generator
 | 
					    Generator,
 | 
				
			||||||
 | 
					    TYPE_CHECKING,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import pendulum
 | 
					import pendulum
 | 
				
			||||||
| 
						 | 
					@ -59,8 +60,10 @@ from ..clearing._messages import (
 | 
				
			||||||
    BrokerdPosition,
 | 
					    BrokerdPosition,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from piker.types import Struct
 | 
					from piker.types import Struct
 | 
				
			||||||
from piker.data._symcache import SymbologyCache
 | 
					from piker.log import get_logger
 | 
				
			||||||
from ..log import get_logger
 | 
					
 | 
				
			||||||
 | 
					if TYPE_CHECKING:
 | 
				
			||||||
 | 
					    from piker.data._symcache import SymbologyCache
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(__name__)
 | 
					log = get_logger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -493,6 +496,17 @@ class Account(Struct):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        _mktmap_table: dict[str, MktPair] | None = None,
 | 
					        _mktmap_table: dict[str, MktPair] | None = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        only_require: list[str]|True = True,
 | 
				
			||||||
 | 
					        # ^list of fqmes that are "required" to be processed from
 | 
				
			||||||
 | 
					        # this ledger pass; we often don't care about others and
 | 
				
			||||||
 | 
					        # definitely shouldn't always error in such cases.
 | 
				
			||||||
 | 
					        # (eg. broker backend loaded that doesn't yet supsport the
 | 
				
			||||||
 | 
					        # symcache but also, inside the paper engine we don't ad-hoc
 | 
				
			||||||
 | 
					        # request `get_mkt_info()` for every symbol in the ledger,
 | 
				
			||||||
 | 
					        # only the one for which we're simulating against).
 | 
				
			||||||
 | 
					        # TODO, not sure if there's a better soln for this, ideally
 | 
				
			||||||
 | 
					        # all backends get symcache support afap i guess..
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> dict[str, Position]:
 | 
					    ) -> dict[str, Position]:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Update the internal `.pps[str, Position]` table from input
 | 
					        Update the internal `.pps[str, Position]` table from input
 | 
				
			||||||
| 
						 | 
					@ -535,11 +549,32 @@ class Account(Struct):
 | 
				
			||||||
                if _mktmap_table is None:
 | 
					                if _mktmap_table is None:
 | 
				
			||||||
                    raise
 | 
					                    raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                required: bool = (
 | 
				
			||||||
 | 
					                    only_require is True
 | 
				
			||||||
 | 
					                    or (
 | 
				
			||||||
 | 
					                        only_require is not True
 | 
				
			||||||
 | 
					                        and
 | 
				
			||||||
 | 
					                        fqme in only_require
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
                # XXX: caller is allowed to provide a fallback
 | 
					                # XXX: caller is allowed to provide a fallback
 | 
				
			||||||
                # mktmap table for the case where a new position is
 | 
					                # mktmap table for the case where a new position is
 | 
				
			||||||
                # being added and the preloaded symcache didn't
 | 
					                # being added and the preloaded symcache didn't
 | 
				
			||||||
                # have this entry prior (eg. with frickin IB..)
 | 
					                # have this entry prior (eg. with frickin IB..)
 | 
				
			||||||
                mkt = _mktmap_table[fqme]
 | 
					                if (
 | 
				
			||||||
 | 
					                    not (mkt := _mktmap_table.get(fqme))
 | 
				
			||||||
 | 
					                    and
 | 
				
			||||||
 | 
					                    required
 | 
				
			||||||
 | 
					                ):
 | 
				
			||||||
 | 
					                    raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                elif not required:
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    # should be an entry retreived somewhere
 | 
				
			||||||
 | 
					                    assert mkt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if not (pos := pps.get(bs_mktid)):
 | 
					            if not (pos := pps.get(bs_mktid)):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -656,7 +691,7 @@ class Account(Struct):
 | 
				
			||||||
    def write_config(self) -> None:
 | 
					    def write_config(self) -> None:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Write the current account state to the user's account TOML file, normally
 | 
					        Write the current account state to the user's account TOML file, normally
 | 
				
			||||||
        something like ``pps.toml``.
 | 
					        something like `pps.toml`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        # TODO: show diff output?
 | 
					        # TODO: show diff output?
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -98,13 +98,14 @@ async def open_cached_client(
 | 
				
			||||||
    If one has not been setup do it and cache it.
 | 
					    If one has not been setup do it and cache it.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    brokermod = get_brokermod(brokername)
 | 
					    brokermod: ModuleType = get_brokermod(brokername)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: make abstract or `typing.Protocol`
 | 
				
			||||||
 | 
					    # client: Client
 | 
				
			||||||
    async with maybe_open_context(
 | 
					    async with maybe_open_context(
 | 
				
			||||||
        acm_func=brokermod.get_client,
 | 
					        acm_func=brokermod.get_client,
 | 
				
			||||||
        kwargs=kwargs,
 | 
					        kwargs=kwargs,
 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) as (cache_hit, client):
 | 
					    ) as (cache_hit, client):
 | 
				
			||||||
 | 
					 | 
				
			||||||
        if cache_hit:
 | 
					        if cache_hit:
 | 
				
			||||||
            log.runtime(f'Reusing existing {client}')
 | 
					            log.runtime(f'Reusing existing {client}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -42,7 +42,6 @@ from trio_typing import TaskStatus
 | 
				
			||||||
from pendulum import (
 | 
					from pendulum import (
 | 
				
			||||||
    from_timestamp,
 | 
					    from_timestamp,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from rapidfuzz import process as fuzzy
 | 
					 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -111,6 +110,7 @@ class AggTrade(Struct, frozen=True):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def stream_messages(
 | 
					async def stream_messages(
 | 
				
			||||||
    ws: NoBsWs,
 | 
					    ws: NoBsWs,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> AsyncGenerator[NoBsWs, dict]:
 | 
					) -> AsyncGenerator[NoBsWs, dict]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: match syntax here!
 | 
					    # TODO: match syntax here!
 | 
				
			||||||
| 
						 | 
					@ -221,6 +221,8 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TODO, why aren't frame resp `log.info()`s showing in upstream
 | 
				
			||||||
 | 
					# code?!
 | 
				
			||||||
@acm
 | 
					@acm
 | 
				
			||||||
async def open_history_client(
 | 
					async def open_history_client(
 | 
				
			||||||
    mkt: MktPair,
 | 
					    mkt: MktPair,
 | 
				
			||||||
| 
						 | 
					@ -463,6 +465,8 @@ async def stream_quotes(
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        init_msgs: list[FeedInit] = []
 | 
					        init_msgs: list[FeedInit] = []
 | 
				
			||||||
        for sym in symbols:
 | 
					        for sym in symbols:
 | 
				
			||||||
 | 
					            mkt: MktPair
 | 
				
			||||||
 | 
					            pair: Pair
 | 
				
			||||||
            mkt, pair = await get_mkt_info(sym)
 | 
					            mkt, pair = await get_mkt_info(sym)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # build out init msgs according to latest spec
 | 
					            # build out init msgs according to latest spec
 | 
				
			||||||
| 
						 | 
					@ -511,7 +515,6 @@ async def stream_quotes(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # start streaming
 | 
					            # start streaming
 | 
				
			||||||
            async for typ, quote in msg_gen:
 | 
					            async for typ, quote in msg_gen:
 | 
				
			||||||
 | 
					 | 
				
			||||||
                # period = time.time() - last
 | 
					                # period = time.time() - last
 | 
				
			||||||
                # hz = 1/period if period else float('inf')
 | 
					                # hz = 1/period if period else float('inf')
 | 
				
			||||||
                # if hz > 60:
 | 
					                # if hz > 60:
 | 
				
			||||||
| 
						 | 
					@ -547,7 +550,7 @@ async def open_symbol_search(
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # repack in fqme-keyed table
 | 
					                # repack in fqme-keyed table
 | 
				
			||||||
                byfqme: dict[start, Pair] = {}
 | 
					                byfqme: dict[str, Pair] = {}
 | 
				
			||||||
                for pair in pairs.values():
 | 
					                for pair in pairs.values():
 | 
				
			||||||
                    byfqme[pair.bs_fqme] = pair
 | 
					                    byfqme[pair.bs_fqme] = pair
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -471,11 +471,15 @@ def search(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    # global opts
 | 
					    # global opts
 | 
				
			||||||
    brokermods = list(config['brokermods'].values())
 | 
					    brokermods: list[ModuleType] = list(config['brokermods'].values())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: this is coming from the `search --pdb` NOT from
 | 
				
			||||||
 | 
					    # the `piker --pdb` XD ..
 | 
				
			||||||
 | 
					    # -[ ] pull from the parent click ctx's values..dumdum
 | 
				
			||||||
 | 
					    # assert pdb
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # define tractor entrypoint
 | 
					    # define tractor entrypoint
 | 
				
			||||||
    async def main(func):
 | 
					    async def main(func):
 | 
				
			||||||
 | 
					 | 
				
			||||||
        async with maybe_open_pikerd(
 | 
					        async with maybe_open_pikerd(
 | 
				
			||||||
            loglevel=config['loglevel'],
 | 
					            loglevel=config['loglevel'],
 | 
				
			||||||
            debug_mode=pdb,
 | 
					            debug_mode=pdb,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -22,7 +22,9 @@ routines should be primitive data types where possible.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import inspect
 | 
					import inspect
 | 
				
			||||||
from types import ModuleType
 | 
					from types import ModuleType
 | 
				
			||||||
from typing import List, Dict, Any, Optional
 | 
					from typing import (
 | 
				
			||||||
 | 
					    Any,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -34,8 +36,10 @@ from ..accounting import MktPair
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def api(brokername: str, methname: str, **kwargs) -> dict:
 | 
					async def api(brokername: str, methname: str, **kwargs) -> dict:
 | 
				
			||||||
    """Make (proxy through) a broker API call by name and return its result.
 | 
					    '''
 | 
				
			||||||
    """
 | 
					    Make (proxy through) a broker API call by name and return its result.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    brokermod = get_brokermod(brokername)
 | 
					    brokermod = get_brokermod(brokername)
 | 
				
			||||||
    async with brokermod.get_client() as client:
 | 
					    async with brokermod.get_client() as client:
 | 
				
			||||||
        meth = getattr(client, methname, None)
 | 
					        meth = getattr(client, methname, None)
 | 
				
			||||||
| 
						 | 
					@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def stocks_quote(
 | 
					async def stocks_quote(
 | 
				
			||||||
    brokermod: ModuleType,
 | 
					    brokermod: ModuleType,
 | 
				
			||||||
    tickers: List[str]
 | 
					    tickers: list[str]
 | 
				
			||||||
) -> Dict[str, Dict[str, Any]]:
 | 
					
 | 
				
			||||||
    """Return quotes dict for ``tickers``.
 | 
					) -> dict[str, dict[str, Any]]:
 | 
				
			||||||
    """
 | 
					    '''
 | 
				
			||||||
 | 
					    Return a `dict` of snapshot quotes for the provided input
 | 
				
			||||||
 | 
					    `tickers`: a `list` of fqmes.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    async with brokermod.get_client() as client:
 | 
					    async with brokermod.get_client() as client:
 | 
				
			||||||
        return await client.quote(tickers)
 | 
					        return await client.quote(tickers)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -74,13 +82,15 @@ async def stocks_quote(
 | 
				
			||||||
async def option_chain(
 | 
					async def option_chain(
 | 
				
			||||||
    brokermod: ModuleType,
 | 
					    brokermod: ModuleType,
 | 
				
			||||||
    symbol: str,
 | 
					    symbol: str,
 | 
				
			||||||
    date: Optional[str] = None,
 | 
					    date: str|None = None,
 | 
				
			||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
 | 
					) -> dict[str, dict[str, dict[str, Any]]]:
 | 
				
			||||||
    """Return option chain for ``symbol`` for ``date``.
 | 
					    '''
 | 
				
			||||||
 | 
					    Return option chain for ``symbol`` for ``date``.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    By default all expiries are returned. If ``date`` is provided
 | 
					    By default all expiries are returned. If ``date`` is provided
 | 
				
			||||||
    then contract quotes for that single expiry are returned.
 | 
					    then contract quotes for that single expiry are returned.
 | 
				
			||||||
    """
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    async with brokermod.get_client() as client:
 | 
					    async with brokermod.get_client() as client:
 | 
				
			||||||
        if date:
 | 
					        if date:
 | 
				
			||||||
            id = int((await client.tickers2ids([symbol]))[symbol])
 | 
					            id = int((await client.tickers2ids([symbol]))[symbol])
 | 
				
			||||||
| 
						 | 
					@ -98,7 +108,7 @@ async def option_chain(
 | 
				
			||||||
# async def contracts(
 | 
					# async def contracts(
 | 
				
			||||||
#     brokermod: ModuleType,
 | 
					#     brokermod: ModuleType,
 | 
				
			||||||
#     symbol: str,
 | 
					#     symbol: str,
 | 
				
			||||||
# ) -> Dict[str, Dict[str, Dict[str, Any]]]:
 | 
					# ) -> dict[str, dict[str, dict[str, Any]]]:
 | 
				
			||||||
#     """Return option contracts (all expiries) for ``symbol``.
 | 
					#     """Return option contracts (all expiries) for ``symbol``.
 | 
				
			||||||
#     """
 | 
					#     """
 | 
				
			||||||
#     async with brokermod.get_client() as client:
 | 
					#     async with brokermod.get_client() as client:
 | 
				
			||||||
| 
						 | 
					@ -110,15 +120,24 @@ async def bars(
 | 
				
			||||||
    brokermod: ModuleType,
 | 
					    brokermod: ModuleType,
 | 
				
			||||||
    symbol: str,
 | 
					    symbol: str,
 | 
				
			||||||
    **kwargs,
 | 
					    **kwargs,
 | 
				
			||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
 | 
					) -> dict[str, dict[str, dict[str, Any]]]:
 | 
				
			||||||
    """Return option contracts (all expiries) for ``symbol``.
 | 
					    '''
 | 
				
			||||||
    """
 | 
					    Return option contracts (all expiries) for ``symbol``.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
    async with brokermod.get_client() as client:
 | 
					    async with brokermod.get_client() as client:
 | 
				
			||||||
        return await client.bars(symbol, **kwargs)
 | 
					        return await client.bars(symbol, **kwargs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def search_w_brokerd(name: str, pattern: str) -> dict:
 | 
					async def search_w_brokerd(
 | 
				
			||||||
 | 
					    name: str,
 | 
				
			||||||
 | 
					    pattern: str,
 | 
				
			||||||
 | 
					) -> dict:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: WHY NOT WORK!?!
 | 
				
			||||||
 | 
					    # when we `step` through the next block?
 | 
				
			||||||
 | 
					    # import tractor
 | 
				
			||||||
 | 
					    # await tractor.pause()
 | 
				
			||||||
    async with open_cached_client(name) as client:
 | 
					    async with open_cached_client(name) as client:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # TODO: support multiple asset type concurrent searches.
 | 
					        # TODO: support multiple asset type concurrent searches.
 | 
				
			||||||
| 
						 | 
					@ -130,12 +149,12 @@ async def symbol_search(
 | 
				
			||||||
    pattern: str,
 | 
					    pattern: str,
 | 
				
			||||||
    **kwargs,
 | 
					    **kwargs,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
 | 
					) -> dict[str, dict[str, dict[str, Any]]]:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Return symbol info from broker.
 | 
					    Return symbol info from broker.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    results = []
 | 
					    results: list[str] = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def search_backend(
 | 
					    async def search_backend(
 | 
				
			||||||
        brokermod: ModuleType
 | 
					        brokermod: ModuleType
 | 
				
			||||||
| 
						 | 
					@ -143,6 +162,13 @@ async def symbol_search(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        brokername: str = mod.name
 | 
					        brokername: str = mod.name
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TODO: figure this the FUCK OUT
 | 
				
			||||||
 | 
					        # -> ok so obvi in the root actor any async task that's
 | 
				
			||||||
 | 
					        # spawned outside the main tractor-root-actor task needs to
 | 
				
			||||||
 | 
					        # call this..
 | 
				
			||||||
 | 
					        # await tractor.devx._debug.maybe_init_greenback()
 | 
				
			||||||
 | 
					        # tractor.pause_from_sync()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with maybe_spawn_brokerd(
 | 
					        async with maybe_spawn_brokerd(
 | 
				
			||||||
            mod.name,
 | 
					            mod.name,
 | 
				
			||||||
            infect_asyncio=getattr(
 | 
					            infect_asyncio=getattr(
 | 
				
			||||||
| 
						 | 
					@ -162,7 +188,6 @@ async def symbol_search(
 | 
				
			||||||
            ))
 | 
					            ))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async with trio.open_nursery() as n:
 | 
					    async with trio.open_nursery() as n:
 | 
				
			||||||
 | 
					 | 
				
			||||||
        for mod in brokermods:
 | 
					        for mod in brokermods:
 | 
				
			||||||
            n.start_soon(search_backend, mod.name)
 | 
					            n.start_soon(search_backend, mod.name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -172,11 +197,13 @@ async def symbol_search(
 | 
				
			||||||
async def mkt_info(
 | 
					async def mkt_info(
 | 
				
			||||||
    brokermod: ModuleType,
 | 
					    brokermod: ModuleType,
 | 
				
			||||||
    fqme: str,
 | 
					    fqme: str,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    **kwargs,
 | 
					    **kwargs,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> MktPair:
 | 
					) -> MktPair:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Return MktPair info from broker including src and dst assets.
 | 
					    Return the `piker.accounting.MktPair` info struct from a given
 | 
				
			||||||
 | 
					    backend broker tradable src/dst asset pair.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    async with open_cached_client(brokermod.name) as client:
 | 
					    async with open_cached_client(brokermod.name) as client:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -168,7 +168,6 @@ class OrderClient(Struct):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def relay_orders_from_sync_code(
 | 
					async def relay_orders_from_sync_code(
 | 
				
			||||||
 | 
					 | 
				
			||||||
    client: OrderClient,
 | 
					    client: OrderClient,
 | 
				
			||||||
    symbol_key: str,
 | 
					    symbol_key: str,
 | 
				
			||||||
    to_ems_stream: tractor.MsgStream,
 | 
					    to_ems_stream: tractor.MsgStream,
 | 
				
			||||||
| 
						 | 
					@ -242,6 +241,11 @@ async def open_ems(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async with maybe_open_emsd(
 | 
					    async with maybe_open_emsd(
 | 
				
			||||||
        broker,
 | 
					        broker,
 | 
				
			||||||
 | 
					        # XXX NOTE, LOL so this determines the daemon `emsd` loglevel
 | 
				
			||||||
 | 
					        # then FYI.. that's kinda wrong no?
 | 
				
			||||||
 | 
					        # -[ ] shouldn't it be set by `pikerd -l` or no?
 | 
				
			||||||
 | 
					        # -[ ] would make a lot more sense to have a subsys ctl for
 | 
				
			||||||
 | 
					        #     levels.. like `-l emsd.info` or something?
 | 
				
			||||||
        loglevel=loglevel,
 | 
					        loglevel=loglevel,
 | 
				
			||||||
    ) as portal:
 | 
					    ) as portal:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -653,7 +653,11 @@ class Router(Struct):
 | 
				
			||||||
            flume = feed.flumes[fqme]
 | 
					            flume = feed.flumes[fqme]
 | 
				
			||||||
            first_quote: dict = flume.first_quote
 | 
					            first_quote: dict = flume.first_quote
 | 
				
			||||||
            book: DarkBook = self.get_dark_book(broker)
 | 
					            book: DarkBook = self.get_dark_book(broker)
 | 
				
			||||||
            book.lasts[fqme]: float = float(first_quote['last'])
 | 
					
 | 
				
			||||||
 | 
					            if not (last := first_quote.get('last')):
 | 
				
			||||||
 | 
					                last: float = flume.rt_shm.array[-1]['close']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            book.lasts[fqme]: float = float(last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async with self.maybe_open_brokerd_dialog(
 | 
					            async with self.maybe_open_brokerd_dialog(
 | 
				
			||||||
                brokermod=brokermod,
 | 
					                brokermod=brokermod,
 | 
				
			||||||
| 
						 | 
					@ -716,7 +720,7 @@ class Router(Struct):
 | 
				
			||||||
            subs = self.subscribers[sub_key]
 | 
					            subs = self.subscribers[sub_key]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        sent_some: bool = False
 | 
					        sent_some: bool = False
 | 
				
			||||||
        for client_stream in subs:
 | 
					        for client_stream in subs.copy():
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                await client_stream.send(msg)
 | 
					                await client_stream.send(msg)
 | 
				
			||||||
                sent_some = True
 | 
					                sent_some = True
 | 
				
			||||||
| 
						 | 
					@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events(
 | 
				
			||||||
                status_msg.brokerd_msg = msg
 | 
					                status_msg.brokerd_msg = msg
 | 
				
			||||||
                status_msg.src = msg.broker_details['name']
 | 
					                status_msg.src = msg.broker_details['name']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                await router.client_broadcast(
 | 
					                if not status_msg.req:
 | 
				
			||||||
                    status_msg.req.symbol,
 | 
					                    # likely some order change state?
 | 
				
			||||||
                    status_msg,
 | 
					                    await tractor.pause()
 | 
				
			||||||
                )
 | 
					                else:
 | 
				
			||||||
 | 
					                    await router.client_broadcast(
 | 
				
			||||||
 | 
					                        status_msg.req.symbol,
 | 
				
			||||||
 | 
					                        status_msg,
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if status == 'closed':
 | 
					                if status == 'closed':
 | 
				
			||||||
                    log.info(f'Execution for {oid} is complete!')
 | 
					                    log.info(f'Execution for {oid} is complete!')
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -297,6 +297,8 @@ class PaperBoi(Struct):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # transmit pp msg to ems
 | 
					        # transmit pp msg to ems
 | 
				
			||||||
        pp: Position = self.acnt.pps[bs_mktid]
 | 
					        pp: Position = self.acnt.pps[bs_mktid]
 | 
				
			||||||
 | 
					        # TODO, this will break if `require_only=True` was passed to
 | 
				
			||||||
 | 
					        # `.update_from_ledger()`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        pp_msg = BrokerdPosition(
 | 
					        pp_msg = BrokerdPosition(
 | 
				
			||||||
            broker=self.broker,
 | 
					            broker=self.broker,
 | 
				
			||||||
| 
						 | 
					@ -653,6 +655,7 @@ async def open_trade_dialog(
 | 
				
			||||||
                # in) use manually constructed table from calling
 | 
					                # in) use manually constructed table from calling
 | 
				
			||||||
                # the `.get_mkt_info()` provider EP above.
 | 
					                # the `.get_mkt_info()` provider EP above.
 | 
				
			||||||
                _mktmap_table=mkt_by_fqme,
 | 
					                _mktmap_table=mkt_by_fqme,
 | 
				
			||||||
 | 
					                only_require=list(mkt_by_fqme),
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            pp_msgs: list[BrokerdPosition] = []
 | 
					            pp_msgs: list[BrokerdPosition] = []
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(subsys)
 | 
					log = get_logger(subsys)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TODO, oof doesn't this ignore the `loglevel` then???
 | 
				
			||||||
get_console_log = partial(
 | 
					get_console_log = partial(
 | 
				
			||||||
    get_console_log,
 | 
					    get_console_log,
 | 
				
			||||||
    name=subsys,
 | 
					    name=subsys,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -140,11 +140,10 @@ def pikerd(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if pdb:
 | 
					        if pdb:
 | 
				
			||||||
            log.warning((
 | 
					            log.warning((
 | 
				
			||||||
                "\n"
 | 
					                '\n\n'
 | 
				
			||||||
                "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
 | 
					                '!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n'
 | 
				
			||||||
                "When a `piker` daemon crashes it will block the "
 | 
					                'When a `piker` daemon crashes it will block the '
 | 
				
			||||||
                "task-thread until resumed from console!\n"
 | 
					                'task-thread until resumed from console!\n'
 | 
				
			||||||
                "\n"
 | 
					 | 
				
			||||||
            ))
 | 
					            ))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # service-actor registry endpoint socket-address set
 | 
					        # service-actor registry endpoint socket-address set
 | 
				
			||||||
| 
						 | 
					@ -177,7 +176,7 @@ def pikerd(
 | 
				
			||||||
        from .. import service
 | 
					        from .. import service
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async def main():
 | 
					        async def main():
 | 
				
			||||||
            service_mngr: service.Services
 | 
					            service_mngr: service.ServiceMngr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async with (
 | 
					            async with (
 | 
				
			||||||
                service.open_pikerd(
 | 
					                service.open_pikerd(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -104,14 +104,15 @@ def get_app_dir(
 | 
				
			||||||
    # `tractor`) with the testing dir and check for it whenever we
 | 
					    # `tractor`) with the testing dir and check for it whenever we
 | 
				
			||||||
    # detect `pytest` is being used (which it isn't under normal
 | 
					    # detect `pytest` is being used (which it isn't under normal
 | 
				
			||||||
    # operation).
 | 
					    # operation).
 | 
				
			||||||
    if "pytest" in sys.modules:
 | 
					    # if "pytest" in sys.modules:
 | 
				
			||||||
        import tractor
 | 
					    #     import tractor
 | 
				
			||||||
        actor = tractor.current_actor(err_on_no_runtime=False)
 | 
					    #     actor = tractor.current_actor(err_on_no_runtime=False)
 | 
				
			||||||
        if actor:  # runtime is up
 | 
					    #     if actor:  # runtime is up
 | 
				
			||||||
            rvs = tractor._state._runtime_vars
 | 
					    #         rvs = tractor._state._runtime_vars
 | 
				
			||||||
            testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
 | 
					    #         import pdbp; pdbp.set_trace()
 | 
				
			||||||
            assert testdirpath.exists(), 'piker test harness might be borked!?'
 | 
					    #         testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
 | 
				
			||||||
            app_name = str(testdirpath)
 | 
					    #         assert testdirpath.exists(), 'piker test harness might be borked!?'
 | 
				
			||||||
 | 
					    #         app_name = str(testdirpath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if platform.system() == 'Windows':
 | 
					    if platform.system() == 'Windows':
 | 
				
			||||||
        key = "APPDATA" if roaming else "LOCALAPPDATA"
 | 
					        key = "APPDATA" if roaming else "LOCALAPPDATA"
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -95,6 +95,12 @@ class Sampler:
 | 
				
			||||||
    # history loading.
 | 
					    # history loading.
 | 
				
			||||||
    incr_task_cs: trio.CancelScope | None = None
 | 
					    incr_task_cs: trio.CancelScope | None = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    bcast_errors: tuple[Exception] = (
 | 
				
			||||||
 | 
					        trio.BrokenResourceError,
 | 
				
			||||||
 | 
					        trio.ClosedResourceError,
 | 
				
			||||||
 | 
					        trio.EndOfChannel,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # holds all the ``tractor.Context`` remote subscriptions for
 | 
					    # holds all the ``tractor.Context`` remote subscriptions for
 | 
				
			||||||
    # a particular sample period increment event: all subscribers are
 | 
					    # a particular sample period increment event: all subscribers are
 | 
				
			||||||
    # notified on a step.
 | 
					    # notified on a step.
 | 
				
			||||||
| 
						 | 
					@ -258,14 +264,15 @@ class Sampler:
 | 
				
			||||||
        subs: set
 | 
					        subs: set
 | 
				
			||||||
        last_ts, subs = pair
 | 
					        last_ts, subs = pair
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        task = trio.lowlevel.current_task()
 | 
					        # NOTE, for debugging pub-sub issues
 | 
				
			||||||
        log.debug(
 | 
					        # task = trio.lowlevel.current_task()
 | 
				
			||||||
            f'SUBS {self.subscribers}\n'
 | 
					        # log.debug(
 | 
				
			||||||
            f'PAIR {pair}\n'
 | 
					        #     f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
 | 
				
			||||||
            f'TASK: {task}: {id(task)}\n'
 | 
					        #     f'PAIR: {pair}\n'
 | 
				
			||||||
            f'broadcasting {period_s} -> {last_ts}\n'
 | 
					        #     f'TASK: {task}: {id(task)}\n'
 | 
				
			||||||
            # f'consumers: {subs}'
 | 
					        #     f'broadcasting {period_s} -> {last_ts}\n'
 | 
				
			||||||
        )
 | 
					        #     f'consumers: {subs}'
 | 
				
			||||||
 | 
					        # )
 | 
				
			||||||
        borked: set[MsgStream] = set()
 | 
					        borked: set[MsgStream] = set()
 | 
				
			||||||
        sent: set[MsgStream] = set()
 | 
					        sent: set[MsgStream] = set()
 | 
				
			||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
| 
						 | 
					@ -282,12 +289,11 @@ class Sampler:
 | 
				
			||||||
                        await stream.send(msg)
 | 
					                        await stream.send(msg)
 | 
				
			||||||
                        sent.add(stream)
 | 
					                        sent.add(stream)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    except (
 | 
					                    except self.bcast_errors as err:
 | 
				
			||||||
                        trio.BrokenResourceError,
 | 
					 | 
				
			||||||
                        trio.ClosedResourceError
 | 
					 | 
				
			||||||
                    ):
 | 
					 | 
				
			||||||
                        log.error(
 | 
					                        log.error(
 | 
				
			||||||
                            f'{stream._ctx.chan.uid} dropped connection'
 | 
					                            f'Connection dropped for IPC ctx\n'
 | 
				
			||||||
 | 
					                            f'{stream._ctx}\n\n'
 | 
				
			||||||
 | 
					                            f'Due to {type(err)}'
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
                        borked.add(stream)
 | 
					                        borked.add(stream)
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
| 
						 | 
					@ -394,7 +400,8 @@ async def register_with_sampler(
 | 
				
			||||||
                finally:
 | 
					                finally:
 | 
				
			||||||
                    if (
 | 
					                    if (
 | 
				
			||||||
                        sub_for_broadcasts
 | 
					                        sub_for_broadcasts
 | 
				
			||||||
                        and subs
 | 
					                        and
 | 
				
			||||||
 | 
					                        subs
 | 
				
			||||||
                    ):
 | 
					                    ):
 | 
				
			||||||
                        try:
 | 
					                        try:
 | 
				
			||||||
                            subs.remove(stream)
 | 
					                            subs.remove(stream)
 | 
				
			||||||
| 
						 | 
					@ -561,8 +568,7 @@ async def open_sample_stream(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def sample_and_broadcast(
 | 
					async def sample_and_broadcast(
 | 
				
			||||||
 | 
					    bus: _FeedsBus,
 | 
				
			||||||
    bus: _FeedsBus,  # noqa
 | 
					 | 
				
			||||||
    rt_shm: ShmArray,
 | 
					    rt_shm: ShmArray,
 | 
				
			||||||
    hist_shm: ShmArray,
 | 
					    hist_shm: ShmArray,
 | 
				
			||||||
    quote_stream: trio.abc.ReceiveChannel,
 | 
					    quote_stream: trio.abc.ReceiveChannel,
 | 
				
			||||||
| 
						 | 
					@ -582,11 +588,33 @@ async def sample_and_broadcast(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    overruns = Counter()
 | 
					    overruns = Counter()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # NOTE, only used for debugging live-data-feed issues, though
 | 
				
			||||||
 | 
					    # this should be resolved more correctly in the future using the
 | 
				
			||||||
 | 
					    # new typed-msgspec feats of `tractor`!
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    # XXX, a multiline nested `dict` formatter (since rn quote-msgs
 | 
				
			||||||
 | 
					    # are just that).
 | 
				
			||||||
 | 
					    # pfmt: Callable[[str], str] = mk_repr()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # iterate stream delivered by broker
 | 
					    # iterate stream delivered by broker
 | 
				
			||||||
    async for quotes in quote_stream:
 | 
					    async for quotes in quote_stream:
 | 
				
			||||||
        # print(quotes)
 | 
					        # print(quotes)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # TODO: ``numba`` this!
 | 
					        # XXX WARNING XXX only enable for debugging bc ow can cost
 | 
				
			||||||
 | 
					        # ALOT of perf with HF-feedz!!!
 | 
				
			||||||
 | 
					        #
 | 
				
			||||||
 | 
					        # log.info(
 | 
				
			||||||
 | 
					        #     'Rx live quotes:\n'
 | 
				
			||||||
 | 
					        #     f'{pfmt(quotes)}'
 | 
				
			||||||
 | 
					        # )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TODO,
 | 
				
			||||||
 | 
					        # -[ ] `numba` or `cython`-nize this loop possibly?
 | 
				
			||||||
 | 
					        #  |_alternatively could we do it in rust somehow by upacking
 | 
				
			||||||
 | 
					        #    arrow msgs instead of using `msgspec`?
 | 
				
			||||||
 | 
					        # -[ ] use `msgspec.Struct` support in new typed-msging from
 | 
				
			||||||
 | 
					        #     `tractor` to ensure only allowed msgs are transmitted?
 | 
				
			||||||
 | 
					        #
 | 
				
			||||||
        for broker_symbol, quote in quotes.items():
 | 
					        for broker_symbol, quote in quotes.items():
 | 
				
			||||||
            # TODO: in theory you can send the IPC msg *before* writing
 | 
					            # TODO: in theory you can send the IPC msg *before* writing
 | 
				
			||||||
            # to the sharedmem array to decrease latency, however, that
 | 
					            # to the sharedmem array to decrease latency, however, that
 | 
				
			||||||
| 
						 | 
					@ -659,6 +687,21 @@ async def sample_and_broadcast(
 | 
				
			||||||
            sub_key: str = broker_symbol.lower()
 | 
					            sub_key: str = broker_symbol.lower()
 | 
				
			||||||
            subs: set[Sub] = bus.get_subs(sub_key)
 | 
					            subs: set[Sub] = bus.get_subs(sub_key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # TODO, figure out how to make this useful whilst
 | 
				
			||||||
 | 
					            # incoporating feed "pausing" ..
 | 
				
			||||||
 | 
					            #
 | 
				
			||||||
 | 
					            # if not subs:
 | 
				
			||||||
 | 
					            #     all_bs_fqmes: list[str] = list(
 | 
				
			||||||
 | 
					            #         bus._subscribers.keys()
 | 
				
			||||||
 | 
					            #     )
 | 
				
			||||||
 | 
					            #     log.warning(
 | 
				
			||||||
 | 
					            #         f'No subscribers for {brokername!r} live-quote ??\n'
 | 
				
			||||||
 | 
					            #         f'broker_symbol: {broker_symbol}\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            #         f'Maybe the backend-sys symbol does not match one of,\n'
 | 
				
			||||||
 | 
					            #         f'{pfmt(all_bs_fqmes)}\n'
 | 
				
			||||||
 | 
					            #     )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: by default the broker backend doesn't append
 | 
					            # NOTE: by default the broker backend doesn't append
 | 
				
			||||||
            # it's own "name" into the fqme schema (but maybe it
 | 
					            # it's own "name" into the fqme schema (but maybe it
 | 
				
			||||||
            # should?) so we have to manually generate the correct
 | 
					            # should?) so we have to manually generate the correct
 | 
				
			||||||
| 
						 | 
					@ -728,18 +771,14 @@ async def sample_and_broadcast(
 | 
				
			||||||
                        if lags > 10:
 | 
					                        if lags > 10:
 | 
				
			||||||
                            await tractor.pause()
 | 
					                            await tractor.pause()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                except (
 | 
					                except Sampler.bcast_errors as ipc_err:
 | 
				
			||||||
                    trio.BrokenResourceError,
 | 
					 | 
				
			||||||
                    trio.ClosedResourceError,
 | 
					 | 
				
			||||||
                    trio.EndOfChannel,
 | 
					 | 
				
			||||||
                ):
 | 
					 | 
				
			||||||
                    ctx: Context = ipc._ctx
 | 
					                    ctx: Context = ipc._ctx
 | 
				
			||||||
                    chan: Channel = ctx.chan
 | 
					                    chan: Channel = ctx.chan
 | 
				
			||||||
                    if ctx:
 | 
					                    if ctx:
 | 
				
			||||||
                        log.warning(
 | 
					                        log.warning(
 | 
				
			||||||
                            'Dropped `brokerd`-quotes-feed connection:\n'
 | 
					                            f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
 | 
				
			||||||
                            f'{broker_symbol}:'
 | 
					                            f'x>) {ctx.cid}@{chan.uid}'
 | 
				
			||||||
                            f'{ctx.cid}@{chan.uid}'
 | 
					                            f'|_{ipc_err!r}\n\n'
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
                    if sub.throttle_rate:
 | 
					                    if sub.throttle_rate:
 | 
				
			||||||
                        assert ipc._closed
 | 
					                        assert ipc._closed
 | 
				
			||||||
| 
						 | 
					@ -756,12 +795,11 @@ async def sample_and_broadcast(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def uniform_rate_send(
 | 
					async def uniform_rate_send(
 | 
				
			||||||
 | 
					 | 
				
			||||||
    rate: float,
 | 
					    rate: float,
 | 
				
			||||||
    quote_stream: trio.abc.ReceiveChannel,
 | 
					    quote_stream: trio.abc.ReceiveChannel,
 | 
				
			||||||
    stream: MsgStream,
 | 
					    stream: MsgStream,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
 | 
					    task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
| 
						 | 
					@ -779,13 +817,16 @@ async def uniform_rate_send(
 | 
				
			||||||
    https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
 | 
					    https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    # TODO: compute the approx overhead latency per cycle
 | 
					    # ?TODO? dynamically compute the **actual** approx overhead latency per cycle
 | 
				
			||||||
    left_to_sleep = throttle_period = 1/rate - 0.000616
 | 
					    # instead of this magic # bidinezz?
 | 
				
			||||||
 | 
					    throttle_period: float = 1/rate - 0.000616
 | 
				
			||||||
 | 
					    left_to_sleep: float = throttle_period
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # send cycle state
 | 
					    # send cycle state
 | 
				
			||||||
 | 
					    first_quote: dict|None
 | 
				
			||||||
    first_quote = last_quote = None
 | 
					    first_quote = last_quote = None
 | 
				
			||||||
    last_send = time.time()
 | 
					    last_send: float = time.time()
 | 
				
			||||||
    diff = 0
 | 
					    diff: float = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    task_status.started()
 | 
					    task_status.started()
 | 
				
			||||||
    ticks_by_type: dict[
 | 
					    ticks_by_type: dict[
 | 
				
			||||||
| 
						 | 
					@ -796,22 +837,28 @@ async def uniform_rate_send(
 | 
				
			||||||
    clear_types = _tick_groups['clears']
 | 
					    clear_types = _tick_groups['clears']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    while True:
 | 
					    while True:
 | 
				
			||||||
 | 
					 | 
				
			||||||
        # compute the remaining time to sleep for this throttled cycle
 | 
					        # compute the remaining time to sleep for this throttled cycle
 | 
				
			||||||
        left_to_sleep = throttle_period - diff
 | 
					        left_to_sleep: float = throttle_period - diff
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if left_to_sleep > 0:
 | 
					        if left_to_sleep > 0:
 | 
				
			||||||
 | 
					            cs: trio.CancelScope
 | 
				
			||||||
            with trio.move_on_after(left_to_sleep) as cs:
 | 
					            with trio.move_on_after(left_to_sleep) as cs:
 | 
				
			||||||
 | 
					                sym: str
 | 
				
			||||||
 | 
					                last_quote: dict
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    sym, last_quote = await quote_stream.receive()
 | 
					                    sym, last_quote = await quote_stream.receive()
 | 
				
			||||||
                except trio.EndOfChannel:
 | 
					                except trio.EndOfChannel:
 | 
				
			||||||
                    log.exception(f"feed for {stream} ended?")
 | 
					                    log.exception(
 | 
				
			||||||
 | 
					                        f'Live stream for feed for ended?\n'
 | 
				
			||||||
 | 
					                        f'<=c\n'
 | 
				
			||||||
 | 
					                        f'  |_[{stream!r}\n'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                diff = time.time() - last_send
 | 
					                diff: float = time.time() - last_send
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if not first_quote:
 | 
					                if not first_quote:
 | 
				
			||||||
                    first_quote = last_quote
 | 
					                    first_quote: float = last_quote
 | 
				
			||||||
                    # first_quote['tbt'] = ticks_by_type
 | 
					                    # first_quote['tbt'] = ticks_by_type
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (throttle_period - diff) > 0:
 | 
					                if (throttle_period - diff) > 0:
 | 
				
			||||||
| 
						 | 
					@ -872,7 +919,9 @@ async def uniform_rate_send(
 | 
				
			||||||
        # TODO: now if only we could sync this to the display
 | 
					        # TODO: now if only we could sync this to the display
 | 
				
			||||||
        # rate timing exactly lul
 | 
					        # rate timing exactly lul
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            await stream.send({sym: first_quote})
 | 
					            await stream.send({
 | 
				
			||||||
 | 
					                sym: first_quote
 | 
				
			||||||
 | 
					            })
 | 
				
			||||||
        except tractor.RemoteActorError as rme:
 | 
					        except tractor.RemoteActorError as rme:
 | 
				
			||||||
            if rme.type is not tractor._exceptions.StreamOverrun:
 | 
					            if rme.type is not tractor._exceptions.StreamOverrun:
 | 
				
			||||||
                raise
 | 
					                raise
 | 
				
			||||||
| 
						 | 
					@ -883,19 +932,28 @@ async def uniform_rate_send(
 | 
				
			||||||
                f'{sym}:{ctx.cid}@{chan.uid}'
 | 
					                f'{sym}:{ctx.cid}@{chan.uid}'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # NOTE: any of these can be raised by `tractor`'s IPC
 | 
				
			||||||
 | 
					        # transport-layer and we want to be highly resilient
 | 
				
			||||||
 | 
					        # to consumers which crash or lose network connection.
 | 
				
			||||||
 | 
					        # I.e. we **DO NOT** want to crash and propagate up to
 | 
				
			||||||
 | 
					        # ``pikerd`` these kinds of errors!
 | 
				
			||||||
        except (
 | 
					        except (
 | 
				
			||||||
            # NOTE: any of these can be raised by ``tractor``'s IPC
 | 
					 | 
				
			||||||
            # transport-layer and we want to be highly resilient
 | 
					 | 
				
			||||||
            # to consumers which crash or lose network connection.
 | 
					 | 
				
			||||||
            # I.e. we **DO NOT** want to crash and propagate up to
 | 
					 | 
				
			||||||
            # ``pikerd`` these kinds of errors!
 | 
					 | 
				
			||||||
            trio.ClosedResourceError,
 | 
					 | 
				
			||||||
            trio.BrokenResourceError,
 | 
					 | 
				
			||||||
            ConnectionResetError,
 | 
					            ConnectionResetError,
 | 
				
			||||||
        ):
 | 
					        ) + Sampler.bcast_errors as ipc_err:
 | 
				
			||||||
            # if the feed consumer goes down then drop
 | 
					            match ipc_err:
 | 
				
			||||||
            # out of this rate limiter
 | 
					                case trio.EndOfChannel():
 | 
				
			||||||
            log.warning(f'{stream} closed')
 | 
					                    log.info(
 | 
				
			||||||
 | 
					                        f'{stream} terminated by peer,\n'
 | 
				
			||||||
 | 
					                        f'{ipc_err!r}'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                case _:
 | 
				
			||||||
 | 
					                    # if the feed consumer goes down then drop
 | 
				
			||||||
 | 
					                    # out of this rate limiter
 | 
				
			||||||
 | 
					                    log.warning(
 | 
				
			||||||
 | 
					                        f'{stream} closed due to,\n'
 | 
				
			||||||
 | 
					                        f'{ipc_err!r}'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            await stream.aclose()
 | 
					            await stream.aclose()
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -31,6 +31,7 @@ from pathlib import Path
 | 
				
			||||||
from pprint import pformat
 | 
					from pprint import pformat
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
    Any,
 | 
					    Any,
 | 
				
			||||||
 | 
					    Callable,
 | 
				
			||||||
    Sequence,
 | 
					    Sequence,
 | 
				
			||||||
    Hashable,
 | 
					    Hashable,
 | 
				
			||||||
    TYPE_CHECKING,
 | 
					    TYPE_CHECKING,
 | 
				
			||||||
| 
						 | 
					@ -56,7 +57,7 @@ from piker.brokers import (
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if TYPE_CHECKING:
 | 
					if TYPE_CHECKING:
 | 
				
			||||||
    from ..accounting import (
 | 
					    from piker.accounting import (
 | 
				
			||||||
        Asset,
 | 
					        Asset,
 | 
				
			||||||
        MktPair,
 | 
					        MktPair,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
| 
						 | 
					@ -149,57 +150,68 @@ class SymbologyCache(Struct):
 | 
				
			||||||
                    'Implement `Client.get_assets()`!'
 | 
					                    'Implement `Client.get_assets()`!'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None):
 | 
					            get_mkt_pairs: Callable|None = getattr(
 | 
				
			||||||
 | 
					                client,
 | 
				
			||||||
                pairs: dict[str, Struct] = await get_mkt_pairs()
 | 
					                'get_mkt_pairs',
 | 
				
			||||||
                for bs_fqme, pair in pairs.items():
 | 
					                None,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
                    # NOTE: every backend defined pair should
 | 
					            if not get_mkt_pairs:
 | 
				
			||||||
                    # declare it's ns path for roundtrip
 | 
					 | 
				
			||||||
                    # serialization lookup.
 | 
					 | 
				
			||||||
                    if not getattr(pair, 'ns_path', None):
 | 
					 | 
				
			||||||
                        raise TypeError(
 | 
					 | 
				
			||||||
                            f'Pair-struct for {self.mod.name} MUST define a '
 | 
					 | 
				
			||||||
                            '`.ns_path: str`!\n'
 | 
					 | 
				
			||||||
                            f'{pair}'
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    entry = await self.mod.get_mkt_info(pair.bs_fqme)
 | 
					 | 
				
			||||||
                    if not entry:
 | 
					 | 
				
			||||||
                        continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    mkt: MktPair
 | 
					 | 
				
			||||||
                    pair: Struct
 | 
					 | 
				
			||||||
                    mkt, _pair = entry
 | 
					 | 
				
			||||||
                    assert _pair is pair, (
 | 
					 | 
				
			||||||
                        f'`{self.mod.name}` backend probably has a '
 | 
					 | 
				
			||||||
                        'keying-symmetry problem between the pair-`Struct` '
 | 
					 | 
				
			||||||
                        'returned from `Client.get_mkt_pairs()`and the '
 | 
					 | 
				
			||||||
                        'module level endpoint: `.get_mkt_info()`\n\n'
 | 
					 | 
				
			||||||
                        "Here's the struct diff:\n"
 | 
					 | 
				
			||||||
                        f'{_pair - pair}'
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    # NOTE XXX: this means backends MUST implement
 | 
					 | 
				
			||||||
                    # a `Struct.bs_mktid: str` field to provide
 | 
					 | 
				
			||||||
                    # a native-keyed map to their own symbol
 | 
					 | 
				
			||||||
                    # set(s).
 | 
					 | 
				
			||||||
                    self.pairs[pair.bs_mktid] = pair
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # NOTE: `MktPair`s are keyed here using piker's
 | 
					 | 
				
			||||||
                    # internal FQME schema so that search,
 | 
					 | 
				
			||||||
                    # accounting and feed init can be accomplished
 | 
					 | 
				
			||||||
                    # a sane, uniform, normalized basis.
 | 
					 | 
				
			||||||
                    self.mktmaps[mkt.fqme] = mkt
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref(
 | 
					 | 
				
			||||||
                    pair,
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                log.warning(
 | 
					                log.warning(
 | 
				
			||||||
                    'No symbology cache `Pair` support for `{provider}`..\n'
 | 
					                    'No symbology cache `Pair` support for `{provider}`..\n'
 | 
				
			||||||
                    'Implement `Client.get_mkt_pairs()`!'
 | 
					                    'Implement `Client.get_mkt_pairs()`!'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					                return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            pairs: dict[str, Struct] = await get_mkt_pairs()
 | 
				
			||||||
 | 
					            if not pairs:
 | 
				
			||||||
 | 
					                log.warning(
 | 
				
			||||||
 | 
					                    'No pairs from intial {provider!r} sym-cache request?\n\n'
 | 
				
			||||||
 | 
					                    '`Client.get_mkt_pairs()` -> {pairs!r} ?'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            for bs_fqme, pair in pairs.items():
 | 
				
			||||||
 | 
					                if not getattr(pair, 'ns_path', None):
 | 
				
			||||||
 | 
					                    # XXX: every backend defined pair must declare
 | 
				
			||||||
 | 
					                    # a `.ns_path: tractor.NamespacePath` to enable
 | 
				
			||||||
 | 
					                    # roundtrip serialization lookup from a local
 | 
				
			||||||
 | 
					                    # cache file.
 | 
				
			||||||
 | 
					                    raise TypeError(
 | 
				
			||||||
 | 
					                        f'Pair-struct for {self.mod.name} MUST define a '
 | 
				
			||||||
 | 
					                        '`.ns_path: str`!\n\n'
 | 
				
			||||||
 | 
					                        f'{pair!r}'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                entry = await self.mod.get_mkt_info(pair.bs_fqme)
 | 
				
			||||||
 | 
					                if not entry:
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                mkt: MktPair
 | 
				
			||||||
 | 
					                pair: Struct
 | 
				
			||||||
 | 
					                mkt, _pair = entry
 | 
				
			||||||
 | 
					                assert _pair is pair, (
 | 
				
			||||||
 | 
					                    f'`{self.mod.name}` backend probably has a '
 | 
				
			||||||
 | 
					                    'keying-symmetry problem between the pair-`Struct` '
 | 
				
			||||||
 | 
					                    'returned from `Client.get_mkt_pairs()`and the '
 | 
				
			||||||
 | 
					                    'module level endpoint: `.get_mkt_info()`\n\n'
 | 
				
			||||||
 | 
					                    "Here's the struct diff:\n"
 | 
				
			||||||
 | 
					                    f'{_pair - pair}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                # NOTE XXX: this means backends MUST implement
 | 
				
			||||||
 | 
					                # a `Struct.bs_mktid: str` field to provide
 | 
				
			||||||
 | 
					                # a native-keyed map to their own symbol
 | 
				
			||||||
 | 
					                # set(s).
 | 
				
			||||||
 | 
					                self.pairs[pair.bs_mktid] = pair
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # NOTE: `MktPair`s are keyed here using piker's
 | 
				
			||||||
 | 
					                # internal FQME schema so that search,
 | 
				
			||||||
 | 
					                # accounting and feed init can be accomplished
 | 
				
			||||||
 | 
					                # a sane, uniform, normalized basis.
 | 
				
			||||||
 | 
					                self.mktmaps[mkt.fqme] = mkt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref(
 | 
				
			||||||
 | 
					                pair,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return self
 | 
					        return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -273,7 +273,7 @@ async def _reconnect_forever(
 | 
				
			||||||
                nobsws._connected.set()
 | 
					                nobsws._connected.set()
 | 
				
			||||||
                await trio.sleep_forever()
 | 
					                await trio.sleep_forever()
 | 
				
			||||||
        except HandshakeError:
 | 
					        except HandshakeError:
 | 
				
			||||||
            log.exception(f'Retrying connection')
 | 
					            log.exception('Retrying connection')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # ws & nursery block ends
 | 
					        # ws & nursery block ends
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -359,8 +359,8 @@ async def open_autorecon_ws(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
 | 
					JSONRPC response-request style machinery for transparent multiplexing
 | 
				
			||||||
over a NoBsWs.
 | 
					of msgs over a NoBsWs.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -377,16 +377,25 @@ async def open_jsonrpc_session(
 | 
				
			||||||
    url: str,
 | 
					    url: str,
 | 
				
			||||||
    start_id: int = 0,
 | 
					    start_id: int = 0,
 | 
				
			||||||
    response_type: type = JSONRPCResult,
 | 
					    response_type: type = JSONRPCResult,
 | 
				
			||||||
    request_type: Optional[type] = None,
 | 
					 | 
				
			||||||
    request_hook: Optional[Callable] = None,
 | 
					 | 
				
			||||||
    error_hook: Optional[Callable] = None,
 | 
					 | 
				
			||||||
) -> Callable[[str, dict], dict]:
 | 
					) -> Callable[[str, dict], dict]:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Init a json-RPC-over-websocket connection to the provided `url`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    A `json_rpc: Callable[[str, dict], dict` is delivered to the
 | 
				
			||||||
 | 
					    caller for sending requests and a bg-`trio.Task` handles
 | 
				
			||||||
 | 
					    processing of response msgs including error reporting/raising in
 | 
				
			||||||
 | 
					    the parent/caller task.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    # NOTE, store all request msgs so we can raise errors on the
 | 
				
			||||||
 | 
					    # caller side!
 | 
				
			||||||
 | 
					    req_msgs: dict[int, dict] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async with (
 | 
					    async with (
 | 
				
			||||||
        trio.open_nursery() as n,
 | 
					        trio.open_nursery() as tn,
 | 
				
			||||||
        open_autorecon_ws(url) as ws
 | 
					        open_autorecon_ws(url) as ws
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        rpc_id: Iterable = count(start_id)
 | 
					        rpc_id: Iterable[int] = count(start_id)
 | 
				
			||||||
        rpc_results: dict[int, dict] = {}
 | 
					        rpc_results: dict[int, dict] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async def json_rpc(method: str, params: dict) -> dict:
 | 
					        async def json_rpc(method: str, params: dict) -> dict:
 | 
				
			||||||
| 
						 | 
					@ -394,26 +403,40 @@ async def open_jsonrpc_session(
 | 
				
			||||||
            perform a json rpc call and wait for the result, raise exception in
 | 
					            perform a json rpc call and wait for the result, raise exception in
 | 
				
			||||||
            case of error field present on response
 | 
					            case of error field present on response
 | 
				
			||||||
            '''
 | 
					            '''
 | 
				
			||||||
 | 
					            nonlocal req_msgs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            req_id: int = next(rpc_id)
 | 
				
			||||||
            msg = {
 | 
					            msg = {
 | 
				
			||||||
                'jsonrpc': '2.0',
 | 
					                'jsonrpc': '2.0',
 | 
				
			||||||
                'id': next(rpc_id),
 | 
					                'id': req_id,
 | 
				
			||||||
                'method': method,
 | 
					                'method': method,
 | 
				
			||||||
                'params': params
 | 
					                'params': params
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            _id = msg['id']
 | 
					            _id = msg['id']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            rpc_results[_id] = {
 | 
					            result = rpc_results[_id] = {
 | 
				
			||||||
                'result': None,
 | 
					                'result': None,
 | 
				
			||||||
                'event': trio.Event()
 | 
					                'error': None,
 | 
				
			||||||
 | 
					                'event': trio.Event(),  # signal caller resp arrived
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            req_msgs[_id] = msg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            await ws.send_msg(msg)
 | 
					            await ws.send_msg(msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # wait for reponse before unblocking requester code
 | 
				
			||||||
            await rpc_results[_id]['event'].wait()
 | 
					            await rpc_results[_id]['event'].wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            ret = rpc_results[_id]['result']
 | 
					            if (maybe_result := result['result']):
 | 
				
			||||||
 | 
					                ret = maybe_result
 | 
				
			||||||
 | 
					                del rpc_results[_id]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            del rpc_results[_id]
 | 
					            else:
 | 
				
			||||||
 | 
					                err = result['error']
 | 
				
			||||||
 | 
					                raise Exception(
 | 
				
			||||||
 | 
					                    f'JSONRPC request failed\n'
 | 
				
			||||||
 | 
					                    f'req: {msg}\n'
 | 
				
			||||||
 | 
					                    f'resp: {err}\n'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if ret.error is not None:
 | 
					            if ret.error is not None:
 | 
				
			||||||
                raise Exception(json.dumps(ret.error, indent=4))
 | 
					                raise Exception(json.dumps(ret.error, indent=4))
 | 
				
			||||||
| 
						 | 
					@ -428,6 +451,7 @@ async def open_jsonrpc_session(
 | 
				
			||||||
            the server side.
 | 
					            the server side.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            '''
 | 
					            '''
 | 
				
			||||||
 | 
					            nonlocal req_msgs
 | 
				
			||||||
            async for msg in ws:
 | 
					            async for msg in ws:
 | 
				
			||||||
                match msg:
 | 
					                match msg:
 | 
				
			||||||
                    case {
 | 
					                    case {
 | 
				
			||||||
| 
						 | 
					@ -451,19 +475,28 @@ async def open_jsonrpc_session(
 | 
				
			||||||
                        'params': _,
 | 
					                        'params': _,
 | 
				
			||||||
                    }:
 | 
					                    }:
 | 
				
			||||||
                        log.debug(f'Recieved\n{msg}')
 | 
					                        log.debug(f'Recieved\n{msg}')
 | 
				
			||||||
                        if request_hook:
 | 
					 | 
				
			||||||
                            await request_hook(request_type(**msg))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    case {
 | 
					                    case {
 | 
				
			||||||
                        'error': error
 | 
					                        'error': error
 | 
				
			||||||
                    }:
 | 
					                    }:
 | 
				
			||||||
                        log.warning(f'Recieved\n{error}')
 | 
					                        # retreive orig request msg, set error
 | 
				
			||||||
                        if error_hook:
 | 
					                        # response in original "result" msg,
 | 
				
			||||||
                            await error_hook(response_type(**msg))
 | 
					                        # THEN FINALLY set the event to signal caller
 | 
				
			||||||
 | 
					                        # to raise the error in the parent task.
 | 
				
			||||||
 | 
					                        req_id: int = error['id']
 | 
				
			||||||
 | 
					                        req_msg: dict = req_msgs[req_id]
 | 
				
			||||||
 | 
					                        result: dict = rpc_results[req_id]
 | 
				
			||||||
 | 
					                        result['error'] = error
 | 
				
			||||||
 | 
					                        result['event'].set()
 | 
				
			||||||
 | 
					                        log.error(
 | 
				
			||||||
 | 
					                            f'JSONRPC request failed\n'
 | 
				
			||||||
 | 
					                            f'req: {req_msg}\n'
 | 
				
			||||||
 | 
					                            f'resp: {error}\n'
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    case _:
 | 
					                    case _:
 | 
				
			||||||
                        log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
 | 
					                        log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        n.start_soon(recv_task)
 | 
					        tn.start_soon(recv_task)
 | 
				
			||||||
        yield json_rpc
 | 
					        yield json_rpc
 | 
				
			||||||
        n.cancel_scope.cancel()
 | 
					        tn.cancel_scope.cancel()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -786,7 +786,6 @@ async def install_brokerd_search(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@acm
 | 
					@acm
 | 
				
			||||||
async def maybe_open_feed(
 | 
					async def maybe_open_feed(
 | 
				
			||||||
 | 
					 | 
				
			||||||
    fqmes: list[str],
 | 
					    fqmes: list[str],
 | 
				
			||||||
    loglevel: str | None = None,
 | 
					    loglevel: str | None = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -840,13 +839,12 @@ async def maybe_open_feed(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@acm
 | 
					@acm
 | 
				
			||||||
async def open_feed(
 | 
					async def open_feed(
 | 
				
			||||||
 | 
					 | 
				
			||||||
    fqmes: list[str],
 | 
					    fqmes: list[str],
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    loglevel: str | None = None,
 | 
					    loglevel: str|None = None,
 | 
				
			||||||
    allow_overruns: bool = True,
 | 
					    allow_overruns: bool = True,
 | 
				
			||||||
    start_stream: bool = True,
 | 
					    start_stream: bool = True,
 | 
				
			||||||
    tick_throttle: float | None = None,  # Hz
 | 
					    tick_throttle: float|None = None,  # Hz
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    allow_remote_ctl_ui: bool = False,
 | 
					    allow_remote_ctl_ui: bool = False,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -36,10 +36,10 @@ from ._sharedmem import (
 | 
				
			||||||
    ShmArray,
 | 
					    ShmArray,
 | 
				
			||||||
    _Token,
 | 
					    _Token,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					from piker.accounting import MktPair
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if TYPE_CHECKING:
 | 
					if TYPE_CHECKING:
 | 
				
			||||||
    from ..accounting import MktPair
 | 
					    from piker.data.feed import Feed
 | 
				
			||||||
    from .feed import Feed
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Flume(Struct):
 | 
					class Flume(Struct):
 | 
				
			||||||
| 
						 | 
					@ -82,7 +82,7 @@ class Flume(Struct):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: do we need this really if we can pull the `Portal` from
 | 
					    # TODO: do we need this really if we can pull the `Portal` from
 | 
				
			||||||
    # ``tractor``'s internals?
 | 
					    # ``tractor``'s internals?
 | 
				
			||||||
    feed: Feed | None = None
 | 
					    feed: Feed|None = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
    def rt_shm(self) -> ShmArray:
 | 
					    def rt_shm(self) -> ShmArray:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -113,9 +113,9 @@ def validate_backend(
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            if ep is None:
 | 
					            if ep is None:
 | 
				
			||||||
                log.warning(
 | 
					                log.warning(
 | 
				
			||||||
                    f'Provider backend {mod.name} is missing '
 | 
					                    f'Provider backend {mod.name!r} is missing '
 | 
				
			||||||
                    f'{daemon_name} support :(\n'
 | 
					                    f'{daemon_name!r} support?\n'
 | 
				
			||||||
                    f'The following endpoint is missing: {name}'
 | 
					                    f'|_module endpoint-func missing: {name!r}\n'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    inits: list[
 | 
					    inits: list[
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										30
									
								
								piker/log.py
								
								
								
								
							
							
						
						
									
										30
									
								
								piker/log.py
								
								
								
								
							| 
						 | 
					@ -19,6 +19,10 @@ Log like a forester!
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
 | 
					import reprlib
 | 
				
			||||||
 | 
					from typing import (
 | 
				
			||||||
 | 
					    Callable,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
from pygments import (
 | 
					from pygments import (
 | 
				
			||||||
| 
						 | 
					@ -84,3 +88,29 @@ def colorize_json(
 | 
				
			||||||
        # likeable styles: algol_nu, tango, monokai
 | 
					        # likeable styles: algol_nu, tango, monokai
 | 
				
			||||||
        formatters.TerminalTrueColorFormatter(style=style)
 | 
					        formatters.TerminalTrueColorFormatter(style=style)
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TODO, eventually defer to the version in `modden` once
 | 
				
			||||||
 | 
					# it becomes a dep!
 | 
				
			||||||
 | 
					def mk_repr(
 | 
				
			||||||
 | 
					    **repr_kws,
 | 
				
			||||||
 | 
					) -> Callable[[str], str]:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Allocate and deliver a `repr.Repr` instance with provided input
 | 
				
			||||||
 | 
					    settings using the std-lib's `reprlib` mod,
 | 
				
			||||||
 | 
					     * https://docs.python.org/3/library/reprlib.html
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ------ Ex. ------
 | 
				
			||||||
 | 
					    An up to 6-layer-nested `dict` as multi-line:
 | 
				
			||||||
 | 
					    - https://stackoverflow.com/a/79102479
 | 
				
			||||||
 | 
					    - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    def_kws: dict[str, int] = dict(
 | 
				
			||||||
 | 
					        indent=2,
 | 
				
			||||||
 | 
					        maxlevel=6,  # recursion levels
 | 
				
			||||||
 | 
					        maxstring=66,  # match editor line-len limit
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    def_kws |= repr_kws
 | 
				
			||||||
 | 
					    reprr = reprlib.Repr(**def_kws)
 | 
				
			||||||
 | 
					    return reprr.repr
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -119,6 +119,10 @@ async def open_piker_runtime(
 | 
				
			||||||
                # spawn other specialized daemons I think?
 | 
					                # spawn other specialized daemons I think?
 | 
				
			||||||
                enable_modules=enable_modules,
 | 
					                enable_modules=enable_modules,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # TODO: how to configure this?
 | 
				
			||||||
 | 
					                # keep it on by default if debug mode is set?
 | 
				
			||||||
 | 
					                maybe_enable_greenback=False,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                **tractor_kwargs,
 | 
					                **tractor_kwargs,
 | 
				
			||||||
            ) as actor,
 | 
					            ) as actor,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -386,6 +386,8 @@ def ldshm(
 | 
				
			||||||
            open_annot_ctl() as actl,
 | 
					            open_annot_ctl() as actl,
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            shm_df: pl.DataFrame | None = None
 | 
					            shm_df: pl.DataFrame | None = None
 | 
				
			||||||
 | 
					            tf2aids: dict[float, dict] = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            for (
 | 
					            for (
 | 
				
			||||||
                shmfile,
 | 
					                shmfile,
 | 
				
			||||||
                shm,
 | 
					                shm,
 | 
				
			||||||
| 
						 | 
					@ -526,16 +528,17 @@ def ldshm(
 | 
				
			||||||
                            new_df,
 | 
					                            new_df,
 | 
				
			||||||
                            step_gaps,
 | 
					                            step_gaps,
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
 | 
					 | 
				
			||||||
                        # last chance manual overwrites in REPL
 | 
					                        # last chance manual overwrites in REPL
 | 
				
			||||||
                        await tractor.pause()
 | 
					                        # await tractor.pause()
 | 
				
			||||||
                        assert aids
 | 
					                        assert aids
 | 
				
			||||||
 | 
					                        tf2aids[period_s] = aids
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    # allow interaction even when no ts problems.
 | 
					                    # allow interaction even when no ts problems.
 | 
				
			||||||
                    await tractor.pause()
 | 
					                    assert not diff
 | 
				
			||||||
                    # assert not diff
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            await tractor.pause()
 | 
				
			||||||
 | 
					            log.info('Exiting TSP shm anal-izer!')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if shm_df is None:
 | 
					            if shm_df is None:
 | 
				
			||||||
                log.error(
 | 
					                log.error(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -161,7 +161,13 @@ class NativeStorageClient:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def index_files(self):
 | 
					    def index_files(self):
 | 
				
			||||||
        for path in self._datadir.iterdir():
 | 
					        for path in self._datadir.iterdir():
 | 
				
			||||||
            if path.name in {'borked', 'expired',}:
 | 
					            if (
 | 
				
			||||||
 | 
					                path.is_dir()
 | 
				
			||||||
 | 
					                or
 | 
				
			||||||
 | 
					                '.parquet' not in str(path)
 | 
				
			||||||
 | 
					                # or
 | 
				
			||||||
 | 
					                # path.name in {'borked', 'expired',}
 | 
				
			||||||
 | 
					            ):
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            key: str = path.name.rstrip('.parquet')
 | 
					            key: str = path.name.rstrip('.parquet')
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -458,13 +458,15 @@ async def start_backfill(
 | 
				
			||||||
                    'bf_until <- last_start_dt:\n'
 | 
					                    'bf_until <- last_start_dt:\n'
 | 
				
			||||||
                    f'{backfill_until_dt} <- {last_start_dt}\n'
 | 
					                    f'{backfill_until_dt} <- {last_start_dt}\n'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					                # UGH: what's a better way?
 | 
				
			||||||
                # ugh, what's a better way?
 | 
					                # TODO: backends are responsible for being correct on
 | 
				
			||||||
                # TODO: fwiw, we probably want a way to signal a throttle
 | 
					                # this right!?
 | 
				
			||||||
                # condition (eg. with ib) so that we can halt the
 | 
					                # -[ ] in the `ib` case we could maybe offer some way
 | 
				
			||||||
                # request loop until the condition is resolved?
 | 
					                #     to halt the request loop until the condition is
 | 
				
			||||||
                if timeframe > 1:
 | 
					                #     resolved or should the backend be entirely in
 | 
				
			||||||
                    await tractor.pause()
 | 
					                #     charge of solving such faults? yes, right?
 | 
				
			||||||
 | 
					                # if timeframe > 1:
 | 
				
			||||||
 | 
					                #     await tractor.pause()
 | 
				
			||||||
                return
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            assert (
 | 
					            assert (
 | 
				
			||||||
| 
						 | 
					@ -572,15 +574,19 @@ async def start_backfill(
 | 
				
			||||||
                    f'{next_start_dt} -> {last_start_dt}'
 | 
					                    f'{next_start_dt} -> {last_start_dt}'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # always drop the src asset token for
 | 
					                # NOTE, always drop the src asset token for
 | 
				
			||||||
                # non-currency-pair like market types (for now)
 | 
					                # non-currency-pair like market types (for now)
 | 
				
			||||||
 | 
					                #
 | 
				
			||||||
 | 
					                # THAT IS, for now our table key schema is NOT
 | 
				
			||||||
 | 
					                # including the dst[/src] source asset token. SO,
 | 
				
			||||||
 | 
					                # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
 | 
				
			||||||
 | 
					                # historical reasons ONLY.
 | 
				
			||||||
                if mkt.dst.atype not in {
 | 
					                if mkt.dst.atype not in {
 | 
				
			||||||
                    'crypto',
 | 
					                    'crypto',
 | 
				
			||||||
                    'crypto_currency',
 | 
					                    'crypto_currency',
 | 
				
			||||||
                    'fiat',  # a "forex pair"
 | 
					                    'fiat',  # a "forex pair"
 | 
				
			||||||
 | 
					                    'perpetual_future',  # stupid "perps" from cex land
 | 
				
			||||||
                }:
 | 
					                }:
 | 
				
			||||||
                    # for now, our table key schema is not including
 | 
					 | 
				
			||||||
                    # the dst[/src] source asset token.
 | 
					 | 
				
			||||||
                    col_sym_key: str = mkt.get_fqme(
 | 
					                    col_sym_key: str = mkt.get_fqme(
 | 
				
			||||||
                        delim_char='',
 | 
					                        delim_char='',
 | 
				
			||||||
                        without_src=True,
 | 
					                        without_src=True,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -616,6 +616,18 @@ def detect_price_gaps(
 | 
				
			||||||
    # ])
 | 
					    # ])
 | 
				
			||||||
    ...
 | 
					    ...
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TODO: probably just use the null_segs impl above?
 | 
				
			||||||
 | 
					def detect_vlm_gaps(
 | 
				
			||||||
 | 
					    df: pl.DataFrame,
 | 
				
			||||||
 | 
					    col: str = 'volume',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> pl.DataFrame:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    vnull: pl.DataFrame = w_dts.filter(
 | 
				
			||||||
 | 
					        pl.col(col) == 0
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    return vnull
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def dedupe(
 | 
					def dedupe(
 | 
				
			||||||
    src_df: pl.DataFrame,
 | 
					    src_df: pl.DataFrame,
 | 
				
			||||||
| 
						 | 
					@ -626,7 +638,6 @@ def dedupe(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> tuple[
 | 
					) -> tuple[
 | 
				
			||||||
    pl.DataFrame,  # with dts
 | 
					    pl.DataFrame,  # with dts
 | 
				
			||||||
    pl.DataFrame,  # gaps
 | 
					 | 
				
			||||||
    pl.DataFrame,  # with deduplicated dts (aka gap/repeat removal)
 | 
					    pl.DataFrame,  # with deduplicated dts (aka gap/repeat removal)
 | 
				
			||||||
    int,  # len diff between input and deduped
 | 
					    int,  # len diff between input and deduped
 | 
				
			||||||
]:
 | 
					]:
 | 
				
			||||||
| 
						 | 
					@ -639,19 +650,22 @@ def dedupe(
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    wdts: pl.DataFrame = with_dts(src_df)
 | 
					    wdts: pl.DataFrame = with_dts(src_df)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # maybe sort on any time field
 | 
					    deduped = wdts
 | 
				
			||||||
    if sort:
 | 
					 | 
				
			||||||
        wdts = wdts.sort(by='time')
 | 
					 | 
				
			||||||
        # TODO: detect out-of-order segments which were corrected!
 | 
					 | 
				
			||||||
        # -[ ] report in log msg
 | 
					 | 
				
			||||||
        # -[ ] possibly return segment sections which were moved?
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # remove duplicated datetime samples/sections
 | 
					    # remove duplicated datetime samples/sections
 | 
				
			||||||
    deduped: pl.DataFrame = wdts.unique(
 | 
					    deduped: pl.DataFrame = wdts.unique(
 | 
				
			||||||
        subset=['dt'],
 | 
					        # subset=['dt'],
 | 
				
			||||||
 | 
					        subset=['time'],
 | 
				
			||||||
        maintain_order=True,
 | 
					        maintain_order=True,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # maybe sort on any time field
 | 
				
			||||||
 | 
					    if sort:
 | 
				
			||||||
 | 
					        deduped = deduped.sort(by='time')
 | 
				
			||||||
 | 
					        # TODO: detect out-of-order segments which were corrected!
 | 
				
			||||||
 | 
					        # -[ ] report in log msg
 | 
				
			||||||
 | 
					        # -[ ] possibly return segment sections which were moved?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    diff: int = (
 | 
					    diff: int = (
 | 
				
			||||||
        wdts.height
 | 
					        wdts.height
 | 
				
			||||||
        -
 | 
					        -
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										228
									
								
								piker/types.py
								
								
								
								
							
							
						
						
									
										228
									
								
								piker/types.py
								
								
								
								
							| 
						 | 
					@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
 | 
				
			||||||
types.
 | 
					types.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from __future__ import annotations
 | 
					from tractor.msg import Struct as Struct
 | 
				
			||||||
from collections import UserList
 | 
					 | 
				
			||||||
from pprint import (
 | 
					 | 
				
			||||||
    saferepr,
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
from typing import Any
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from msgspec import (
 | 
					 | 
				
			||||||
    msgpack,
 | 
					 | 
				
			||||||
    Struct as _Struct,
 | 
					 | 
				
			||||||
    structs,
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class DiffDump(UserList):
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    Very simple list delegator that repr() dumps (presumed) tuple
 | 
					 | 
				
			||||||
    elements of the form `tuple[str, Any, Any]` in a nice
 | 
					 | 
				
			||||||
    multi-line readable form for analyzing `Struct` diffs.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    def __repr__(self) -> str:
 | 
					 | 
				
			||||||
        if not len(self):
 | 
					 | 
				
			||||||
            return super().__repr__()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # format by displaying item pair's ``repr()`` on multiple,
 | 
					 | 
				
			||||||
        # indented lines such that they are more easily visually
 | 
					 | 
				
			||||||
        # comparable when printed to console when printed to
 | 
					 | 
				
			||||||
        # console.
 | 
					 | 
				
			||||||
        repstr: str = '[\n'
 | 
					 | 
				
			||||||
        for k, left, right in self:
 | 
					 | 
				
			||||||
            repstr += (
 | 
					 | 
				
			||||||
                f'({k},\n'
 | 
					 | 
				
			||||||
                f'\t{repr(left)},\n'
 | 
					 | 
				
			||||||
                f'\t{repr(right)},\n'
 | 
					 | 
				
			||||||
                ')\n'
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        repstr += ']\n'
 | 
					 | 
				
			||||||
        return repstr
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Struct(
 | 
					 | 
				
			||||||
    _Struct,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # https://jcristharif.com/msgspec/structs.html#tagged-unions
 | 
					 | 
				
			||||||
    # tag='pikerstruct',
 | 
					 | 
				
			||||||
    # tag=True,
 | 
					 | 
				
			||||||
):
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    A "human friendlier" (aka repl buddy) struct subtype.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    def _sin_props(self) -> Iterator[
 | 
					 | 
				
			||||||
        tuple[
 | 
					 | 
				
			||||||
            structs.FieldIinfo,
 | 
					 | 
				
			||||||
            str,
 | 
					 | 
				
			||||||
            Any,
 | 
					 | 
				
			||||||
        ]
 | 
					 | 
				
			||||||
    ]:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Iterate over all non-@property fields of this struct.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					 | 
				
			||||||
        for fi in structs.fields(self):
 | 
					 | 
				
			||||||
            key: str = fi.name
 | 
					 | 
				
			||||||
            val: Any = getattr(self, key)
 | 
					 | 
				
			||||||
            yield fi, key, val
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def to_dict(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        include_non_members: bool = True,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) -> dict:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Like it sounds.. direct delegation to:
 | 
					 | 
				
			||||||
        https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        BUT, by default we pop all non-member (aka not defined as
 | 
					 | 
				
			||||||
        struct fields) fields by default.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        asdict: dict = structs.asdict(self)
 | 
					 | 
				
			||||||
        if include_non_members:
 | 
					 | 
				
			||||||
            return asdict
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # only return a dict of the struct members
 | 
					 | 
				
			||||||
        # which were provided as input, NOT anything
 | 
					 | 
				
			||||||
        # added as type-defined `@property` methods!
 | 
					 | 
				
			||||||
        sin_props: dict = {}
 | 
					 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					 | 
				
			||||||
        for fi, k, v in self._sin_props():
 | 
					 | 
				
			||||||
            sin_props[k] = asdict[k]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return sin_props
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def pformat(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        field_indent: int = 2,
 | 
					 | 
				
			||||||
        indent: int = 0,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) -> str:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Recursion-safe `pprint.pformat()` style formatting of
 | 
					 | 
				
			||||||
        a `msgspec.Struct` for sane reading by a human using a REPL.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        # global whitespace indent
 | 
					 | 
				
			||||||
        ws: str = ' '*indent
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # field whitespace indent
 | 
					 | 
				
			||||||
        field_ws: str = ' '*(field_indent + indent)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # qtn: str = ws + self.__class__.__qualname__
 | 
					 | 
				
			||||||
        qtn: str = self.__class__.__qualname__
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        obj_str: str = ''  # accumulator
 | 
					 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					 | 
				
			||||||
        k: str
 | 
					 | 
				
			||||||
        v: Any
 | 
					 | 
				
			||||||
        for fi, k, v in self._sin_props():
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # TODO: how can we prefer `Literal['option1',  'option2,
 | 
					 | 
				
			||||||
            # ..]` over .__name__ == `Literal` but still get only the
 | 
					 | 
				
			||||||
            # latter for simple types like `str | int | None` etc..?
 | 
					 | 
				
			||||||
            ft: type = fi.type
 | 
					 | 
				
			||||||
            typ_name: str = getattr(ft, '__name__', str(ft))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # recurse to get sub-struct's `.pformat()` output Bo
 | 
					 | 
				
			||||||
            if isinstance(v, Struct):
 | 
					 | 
				
			||||||
                val_str: str =  v.pformat(
 | 
					 | 
				
			||||||
                    indent=field_indent + indent,
 | 
					 | 
				
			||||||
                    field_indent=indent + field_indent,
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            else:  # the `pprint` recursion-safe format:
 | 
					 | 
				
			||||||
                # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
 | 
					 | 
				
			||||||
                val_str: str = saferepr(v)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return (
 | 
					 | 
				
			||||||
            f'{qtn}(\n'
 | 
					 | 
				
			||||||
            f'{obj_str}'
 | 
					 | 
				
			||||||
            f'{ws})'
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # TODO: use a pprint.PrettyPrinter instance around ONLY rendering
 | 
					 | 
				
			||||||
    # inside a known tty?
 | 
					 | 
				
			||||||
    # def __repr__(self) -> str:
 | 
					 | 
				
			||||||
    #     ...
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # __str__ = __repr__ = pformat
 | 
					 | 
				
			||||||
    __repr__ = pformat
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def copy(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        update: dict | None = None,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) -> Struct:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Validate-typecast all self defined fields, return a copy of
 | 
					 | 
				
			||||||
        us with all such fields.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        NOTE: This is kinda like the default behaviour in
 | 
					 | 
				
			||||||
        `pydantic.BaseModel` except a copy of the object is
 | 
					 | 
				
			||||||
        returned making it compat with `frozen=True`.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        if update:
 | 
					 | 
				
			||||||
            for k, v in update.items():
 | 
					 | 
				
			||||||
                setattr(self, k, v)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # NOTE: roundtrip serialize to validate
 | 
					 | 
				
			||||||
        # - enode to msgpack binary format,
 | 
					 | 
				
			||||||
        # - decode that back to a struct.
 | 
					 | 
				
			||||||
        return msgpack.Decoder(type=type(self)).decode(
 | 
					 | 
				
			||||||
            msgpack.Encoder().encode(self)
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def typecast(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # TODO: allow only casting a named subset?
 | 
					 | 
				
			||||||
        # fields: set[str] | None = None,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Cast all fields using their declared type annotations
 | 
					 | 
				
			||||||
        (kinda like what `pydantic` does by default).
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        NOTE: this of course won't work on frozen types, use
 | 
					 | 
				
			||||||
        ``.copy()`` above in such cases.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        # https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
 | 
					 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					 | 
				
			||||||
        for fi in structs.fields(self):
 | 
					 | 
				
			||||||
            setattr(
 | 
					 | 
				
			||||||
                self,
 | 
					 | 
				
			||||||
                fi.name,
 | 
					 | 
				
			||||||
                fi.type(getattr(self, fi.name)),
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def __sub__(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        other: Struct,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    ) -> DiffDump[tuple[str, Any, Any]]:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Compare fields/items key-wise and return a ``DiffDump``
 | 
					 | 
				
			||||||
        for easy visual REPL comparison B)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
 | 
					 | 
				
			||||||
        for fi in structs.fields(self):
 | 
					 | 
				
			||||||
            attr_name: str = fi.name
 | 
					 | 
				
			||||||
            ours: Any = getattr(self, attr_name)
 | 
					 | 
				
			||||||
            theirs: Any = getattr(other, attr_name)
 | 
					 | 
				
			||||||
            if ours != theirs:
 | 
					 | 
				
			||||||
                diffs.append((
 | 
					 | 
				
			||||||
                    attr_name,
 | 
					 | 
				
			||||||
                    ours,
 | 
					 | 
				
			||||||
                    theirs,
 | 
					 | 
				
			||||||
                ))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        return diffs
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue