Compare commits

...

6 Commits

Author SHA1 Message Date
Tyler Goodlet c7c823046e `.accounting._ledger`: typing anda more multiline styling 2025-02-13 21:00:49 -05:00
Tyler Goodlet 41c35c07d3 Drop some bps and style logic to multiline 2025-02-13 21:00:49 -05:00
Tyler Goodlet 23eb089a2f `.accounting` add synopsis section to readme 2025-02-13 21:00:49 -05:00
Tyler Goodlet 1c072b3ccd `.questrade`: link in ws-API issue! 2025-02-13 20:20:21 -05:00
Tyler Goodlet 45731e1bea `.kraken.broker`: need to `await verify_balances()` .. 2025-02-13 20:20:21 -05:00
Tyler Goodlet aaf7796cf2 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2025-02-13 20:20:21 -05:00
8 changed files with 110 additions and 30 deletions

View File

@ -1,8 +1,40 @@
.accounting piker.accounting
----------- ________________
A subsystem for transaction processing, storage and historical A subsystem for transaction processing, storage and historical
measurement. measurement.
synopsis
--------
The big question for any trader is this:
*what is the price that determines whether i take a loss or a gain on my
trade?*
In other words, at any given state of accounting your current assets,
what is the price between any 2 assets you've transacted that determines
at which price you can conduct **the next** transaction and know if you
are making or losing more (or less) of the *source* asset versus the
*destination* asset?
Let's do a very simple example:
> Joe wants to buy some tacos bc they're super hungo.
> Joe has a friend who also likes tacos but doesn't mind if they're fresh; he doesn't mind having day old tacos.
> Inflation is rampant and taco prices are trending up for no good reason besides everyone thinks prices are going up.
> Joe goes to the taco stand and buys 4 tacos at 25 mxn.
> This makes Joe's net cost `4 * 25 = 200` mxn.
> Joe eats 3 tacos and realizes that he can't finish the last, so he puts it in the fridge to save for the next day (since he owns a comal).
> The next day the price of tacos goes up to 30 mxn (for no good reason > besides the taco stand noticing Joe is a tourist and that > "inflation" is some thing that's used as an excuse for price changes).
> Joe's friend from before got lit up (like he does every morning) and msgs Joe to buy him 2 tacos for when he shows up in the late morning.
> Joe says "sure, but i also have a leftover if you want it, and I'm fasting today so you can have my sobras and i'll buy you a new one".
> The friend coughs a couple times, and says "yee no problem man, just make sure you get them"
>
Prior *suit* definitions:
- the canucks equiv of the IRS call this idea ["Adjusted cost base"](https://www.canada.ca/en/revenue-agency/services/tax/individuals/topics/about-your-tax-return/tax-return/completing-a-tax-return/personal-income/line-12700-capital-gains/definitions-capital-gains.html#Adjustedcostbase)
.pnl .pnl
---- ----

View File

@ -40,7 +40,7 @@ import tomli_w # for fast ledger writing
from piker.types import Struct from piker.types import Struct
from piker import config from piker import config
from ..log import get_logger from piker.log import get_logger
from .calc import ( from .calc import (
iter_by_dt, iter_by_dt,
) )
@ -239,7 +239,9 @@ class TransactionLedger(UserDict):
symcache: SymbologyCache = self._symcache symcache: SymbologyCache = self._symcache
towrite: dict[str, Any] = {} towrite: dict[str, Any] = {}
for tid, txdict in self.tx_sort(self.data.copy()): for tid, txdict in self.tx_sort(
self.data.copy()
):
# write blank-str expiry for non-expiring assets # write blank-str expiry for non-expiring assets
if ( if (
'expiry' in txdict 'expiry' in txdict
@ -377,7 +379,7 @@ def open_trade_ledger(
account, account,
dirpath=_fp, dirpath=_fp,
) )
cpy = ledger_dict.copy() cpy: dict = ledger_dict.copy()
# XXX NOTE: if not provided presume we are being called from # XXX NOTE: if not provided presume we are being called from
# sync code and need to maybe run `trio` to generate.. # sync code and need to maybe run `trio` to generate..
@ -406,7 +408,13 @@ def open_trade_ledger(
account=account, account=account,
mod=mod, mod=mod,
symcache=symcache, symcache=symcache,
tx_sort=getattr(mod, 'tx_sort', tx_sort),
# NOTE: allow backends to provide custom ledger sorting
tx_sort=getattr(
mod,
'tx_sort',
tx_sort,
),
) )
try: try:
yield ledger yield ledger

View File

@ -305,8 +305,8 @@ class MktPair(Struct, frozen=True):
# config right? # config right?
# src_type: AssetTypeName # src_type: AssetTypeName
# for derivs, info describing contract, egs. # for derivs, info describing contract, egs. strike price, call
# strike price, call or put, swap type, exercise model, etc. # or put, swap type, exercise model, etc.
contract_info: list[str] | None = None contract_info: list[str] | None = None
# TODO: rename to sectype since all of these can # TODO: rename to sectype since all of these can

View File

@ -251,10 +251,16 @@ def iter_by_dt(
for k in parsers: for k in parsers:
if ( if (
isdict and k in tx isdict and k in tx
or getattr(tx, k, None) or
getattr(tx, k, None)
): ):
v = tx[k] if isdict else tx.dt v = (
assert v is not None, f'No valid value for `{k}`!?' tx[k] if isdict
else tx.dt
)
assert v is not None, (
f'No valid value for `{k}`!?'
)
# only call parser on the value if not None from # only call parser on the value if not None from
# the `parsers` table above (when NOT using # the `parsers` table above (when NOT using
@ -269,8 +275,21 @@ def iter_by_dt(
return v return v
else: else:
# XXX: should never get here.. # TODO: move to top?
breakpoint() from piker.log import get_logger
log = get_logger(__name__)
# XXX: we should really never get here..
# only if a ledger record has no expected sort(able)
# field will we likely hit this.. like with ze IB.
# if no sortable field just deliver epoch?
log.warning(
'No (time) sortable field for TXN:\n'
f'{tx}\n'
)
return from_timestamp(0)
# breakpoint()
entry: tuple[str, dict] | Transaction entry: tuple[str, dict] | Transaction
for entry in sorted( for entry in sorted(

View File

@ -300,7 +300,8 @@ def disect(
assert not df.is_empty() assert not df.is_empty()
# muck around in pdbp REPL # muck around in pdbp REPL
breakpoint() # tractor.devx.mk_pdb().set_trace()
# breakpoint()
# TODO: we REALLY need a better console REPL for this # TODO: we REALLY need a better console REPL for this
# kinda thing.. # kinda thing..

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel() data_cs.cancel()
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await tn.start(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn # such that simultaneous symbol queries don't try data resettingn
# too fast.. # too fast..
unset_resetter: bool = False unset_resetter: bool = False
async with trio.open_nursery() as nurse: async with trio.open_nursery() as tn:
# start history request that we allow # start history request that we allow
# to run indefinitely until a result is acquired # to run indefinitely until a result is acquired
nurse.start_soon(query) tn.start_soon(query)
# start history reset loop which waits up to the timeout # start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset. # for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True unset_resetter: bool = True
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await tn.start(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
@ -705,7 +705,9 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown(): def teardown():
ticker.updateEvent.disconnect(push) ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`") log.error(
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract) client.ib.cancelMktData(contract)
# decouple broadcast mem chan # decouple broadcast mem chan
@ -761,7 +763,10 @@ async def open_aio_quote_stream(
symbol: str, symbol: str,
contract: Contract | None = None, contract: Contract | None = None,
) -> trio.abc.ReceiveStream: ) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
from tractor.trionics import broadcast_receiver from tractor.trionics import broadcast_receiver
global _quote_streams global _quote_streams
@ -778,6 +783,7 @@ async def open_aio_quote_stream(
yield from_aio yield from_aio
return return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from( async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
@ -983,17 +989,18 @@ async def stream_quotes(
) )
cs: trio.CancelScope | None = None cs: trio.CancelScope | None = None
startup: bool = True startup: bool = True
iter_quotes: trio.abc.Channel
while ( while (
startup startup
or cs.cancel_called or cs.cancel_called
): ):
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with ( async with (
trio.open_nursery() as nurse, trio.open_nursery() as tn,
open_aio_quote_stream( open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream, ) as iter_quotes,
): ):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
@ -1021,9 +1028,9 @@ async def stream_quotes(
await rt_ev.wait() await rt_ev.wait()
cs.cancel() # cancel called should now be set cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed) tn.start_soon(reset_on_feed)
async with aclosing(stream): async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False): # if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']: if not init_msg.shm_write_opts['has_vlm']:
@ -1038,19 +1045,21 @@ async def stream_quotes(
# wait for real volume on feed (trading might be # wait for real volume on feed (trading might be
# closed) # closed)
while True: while True:
ticker = await stream.receive() ticker = await iter_quotes.receive()
# for a real volume contract we rait for # for a real volume contract we rait for
# the first "real" trade to take place # the first "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
not ticker.rtTime False
# not ticker.rtTime
): ):
# spin consuming tickers until we # spin consuming tickers until we
# get a real market datum # get a real market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first volume tick") log.debug("Received first volume tick")
# ugh, clear ticks since we've # ugh, clear ticks since we've
@ -1066,13 +1075,18 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live # tell data-layer spawner-caller that live
# quotes are now streaming. # quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set() feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time() # last = time.time()
async for ticker in stream: async for ticker in iter_quotes:
quote = normalize(ticker) quote = normalize(ticker)
fqme = quote['fqme'] fqme: str = quote['fqme']
await send_chan.send({fqme: quote}) await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them

View File

@ -544,7 +544,7 @@ async def open_trade_dialog(
# to be reloaded. # to be reloaded.
balances: dict[str, float] = await client.get_balances() balances: dict[str, float] = await client.get_balances()
verify_balances( await verify_balances(
acnt, acnt,
src_fiat, src_fiat,
balances, balances,

View File

@ -37,6 +37,12 @@ import tractor
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
import numpy as np import numpy as np
import wrapt import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks import asks
from ..calc import humanize, percent_change from ..calc import humanize, percent_change