brokers_refinery: various updates/fixes/typing during options/max-pain feats dev #40
|
@ -98,13 +98,14 @@ async def open_cached_client(
|
||||||
If one has not been setup do it and cache it.
|
If one has not been setup do it and cache it.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
brokermod = get_brokermod(brokername)
|
brokermod: ModuleType = get_brokermod(brokername)
|
||||||
|
|
||||||
|
# TODO: make abstract or `typing.Protocol`
|
||||||
|
# client: Client
|
||||||
async with maybe_open_context(
|
async with maybe_open_context(
|
||||||
acm_func=brokermod.get_client,
|
acm_func=brokermod.get_client,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
|
|
||||||
) as (cache_hit, client):
|
) as (cache_hit, client):
|
||||||
|
|
||||||
if cache_hit:
|
if cache_hit:
|
||||||
log.runtime(f'Reusing existing {client}')
|
log.runtime(f'Reusing existing {client}')
|
||||||
|
|
||||||
|
|
|
@ -471,11 +471,15 @@ def search(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# global opts
|
# global opts
|
||||||
brokermods = list(config['brokermods'].values())
|
brokermods: list[ModuleType] = list(config['brokermods'].values())
|
||||||
|
|
||||||
|
# TODO: this is coming from the `search --pdb` NOT from
|
||||||
|
# the `piker --pdb` XD ..
|
||||||
|
# -[ ] pull from the parent click ctx's values..dumdum
|
||||||
|
# assert pdb
|
||||||
|
|
||||||
# define tractor entrypoint
|
# define tractor entrypoint
|
||||||
async def main(func):
|
async def main(func):
|
||||||
|
|
||||||
async with maybe_open_pikerd(
|
async with maybe_open_pikerd(
|
||||||
loglevel=config['loglevel'],
|
loglevel=config['loglevel'],
|
||||||
debug_mode=pdb,
|
debug_mode=pdb,
|
||||||
|
|
|
@ -22,7 +22,9 @@ routines should be primitive data types where possible.
|
||||||
"""
|
"""
|
||||||
import inspect
|
import inspect
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import (
|
||||||
|
Any,
|
||||||
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -34,8 +36,10 @@ from ..accounting import MktPair
|
||||||
|
|
||||||
|
|
||||||
async def api(brokername: str, methname: str, **kwargs) -> dict:
|
async def api(brokername: str, methname: str, **kwargs) -> dict:
|
||||||
"""Make (proxy through) a broker API call by name and return its result.
|
'''
|
||||||
"""
|
Make (proxy through) a broker API call by name and return its result.
|
||||||
|
|
||||||
|
'''
|
||||||
brokermod = get_brokermod(brokername)
|
brokermod = get_brokermod(brokername)
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
meth = getattr(client, methname, None)
|
meth = getattr(client, methname, None)
|
||||||
|
@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
|
||||||
|
|
||||||
async def stocks_quote(
|
async def stocks_quote(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
tickers: List[str]
|
tickers: list[str]
|
||||||
) -> Dict[str, Dict[str, Any]]:
|
|
||||||
"""Return quotes dict for ``tickers``.
|
) -> dict[str, dict[str, Any]]:
|
||||||
"""
|
'''
|
||||||
|
Return a `dict` of snapshot quotes for the provided input
|
||||||
|
`tickers`: a `list` of fqmes.
|
||||||
|
|
||||||
|
'''
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
return await client.quote(tickers)
|
return await client.quote(tickers)
|
||||||
|
|
||||||
|
@ -74,13 +82,15 @@ async def stocks_quote(
|
||||||
async def option_chain(
|
async def option_chain(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
date: Optional[str] = None,
|
date: str|None = None,
|
||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
"""Return option chain for ``symbol`` for ``date``.
|
'''
|
||||||
|
Return option chain for ``symbol`` for ``date``.
|
||||||
|
|
||||||
By default all expiries are returned. If ``date`` is provided
|
By default all expiries are returned. If ``date`` is provided
|
||||||
then contract quotes for that single expiry are returned.
|
then contract quotes for that single expiry are returned.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
if date:
|
if date:
|
||||||
id = int((await client.tickers2ids([symbol]))[symbol])
|
id = int((await client.tickers2ids([symbol]))[symbol])
|
||||||
|
@ -98,7 +108,7 @@ async def option_chain(
|
||||||
# async def contracts(
|
# async def contracts(
|
||||||
# brokermod: ModuleType,
|
# brokermod: ModuleType,
|
||||||
# symbol: str,
|
# symbol: str,
|
||||||
# ) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
# ) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
# """Return option contracts (all expiries) for ``symbol``.
|
# """Return option contracts (all expiries) for ``symbol``.
|
||||||
# """
|
# """
|
||||||
# async with brokermod.get_client() as client:
|
# async with brokermod.get_client() as client:
|
||||||
|
@ -110,15 +120,24 @@ async def bars(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
"""Return option contracts (all expiries) for ``symbol``.
|
'''
|
||||||
"""
|
Return option contracts (all expiries) for ``symbol``.
|
||||||
|
|
||||||
|
'''
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
return await client.bars(symbol, **kwargs)
|
return await client.bars(symbol, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
async def search_w_brokerd(name: str, pattern: str) -> dict:
|
async def search_w_brokerd(
|
||||||
|
name: str,
|
||||||
|
pattern: str,
|
||||||
|
) -> dict:
|
||||||
|
|
||||||
|
# TODO: WHY NOT WORK!?!
|
||||||
|
# when we `step` through the next block?
|
||||||
|
# import tractor
|
||||||
|
# await tractor.pause()
|
||||||
async with open_cached_client(name) as client:
|
async with open_cached_client(name) as client:
|
||||||
|
|
||||||
# TODO: support multiple asset type concurrent searches.
|
# TODO: support multiple asset type concurrent searches.
|
||||||
|
@ -130,12 +149,12 @@ async def symbol_search(
|
||||||
pattern: str,
|
pattern: str,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
'''
|
'''
|
||||||
Return symbol info from broker.
|
Return symbol info from broker.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
results = []
|
results: list[str] = []
|
||||||
|
|
||||||
async def search_backend(
|
async def search_backend(
|
||||||
brokermod: ModuleType
|
brokermod: ModuleType
|
||||||
|
@ -143,6 +162,13 @@ async def symbol_search(
|
||||||
|
|
||||||
brokername: str = mod.name
|
brokername: str = mod.name
|
||||||
|
|
||||||
|
# TODO: figure this the FUCK OUT
|
||||||
|
# -> ok so obvi in the root actor any async task that's
|
||||||
|
# spawned outside the main tractor-root-actor task needs to
|
||||||
|
# call this..
|
||||||
|
# await tractor.devx._debug.maybe_init_greenback()
|
||||||
|
# tractor.pause_from_sync()
|
||||||
|
|
||||||
async with maybe_spawn_brokerd(
|
async with maybe_spawn_brokerd(
|
||||||
mod.name,
|
mod.name,
|
||||||
infect_asyncio=getattr(
|
infect_asyncio=getattr(
|
||||||
|
@ -162,7 +188,6 @@ async def symbol_search(
|
||||||
))
|
))
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
for mod in brokermods:
|
for mod in brokermods:
|
||||||
n.start_soon(search_backend, mod.name)
|
n.start_soon(search_backend, mod.name)
|
||||||
|
|
||||||
|
@ -172,11 +197,13 @@ async def symbol_search(
|
||||||
async def mkt_info(
|
async def mkt_info(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> MktPair:
|
) -> MktPair:
|
||||||
'''
|
'''
|
||||||
Return MktPair info from broker including src and dst assets.
|
Return the `piker.accounting.MktPair` info struct from a given
|
||||||
|
backend broker tradable src/dst asset pair.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with open_cached_client(brokermod.name) as client:
|
async with open_cached_client(brokermod.name) as client:
|
||||||
|
|
|
@ -587,7 +587,7 @@ async def get_bars(
|
||||||
data_cs.cancel()
|
data_cs.cancel()
|
||||||
|
|
||||||
# spawn new data reset task
|
# spawn new data reset task
|
||||||
data_cs, reset_done = await nurse.start(
|
data_cs, reset_done = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
wait_on_data_reset,
|
wait_on_data_reset,
|
||||||
proxy,
|
proxy,
|
||||||
|
@ -607,11 +607,11 @@ async def get_bars(
|
||||||
# such that simultaneous symbol queries don't try data resettingn
|
# such that simultaneous symbol queries don't try data resettingn
|
||||||
# too fast..
|
# too fast..
|
||||||
unset_resetter: bool = False
|
unset_resetter: bool = False
|
||||||
async with trio.open_nursery() as nurse:
|
async with trio.open_nursery() as tn:
|
||||||
|
|
||||||
# start history request that we allow
|
# start history request that we allow
|
||||||
# to run indefinitely until a result is acquired
|
# to run indefinitely until a result is acquired
|
||||||
nurse.start_soon(query)
|
tn.start_soon(query)
|
||||||
|
|
||||||
# start history reset loop which waits up to the timeout
|
# start history reset loop which waits up to the timeout
|
||||||
# for a result before triggering a data feed reset.
|
# for a result before triggering a data feed reset.
|
||||||
|
@ -631,7 +631,7 @@ async def get_bars(
|
||||||
unset_resetter: bool = True
|
unset_resetter: bool = True
|
||||||
|
|
||||||
# spawn new data reset task
|
# spawn new data reset task
|
||||||
data_cs, reset_done = await nurse.start(
|
data_cs, reset_done = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
wait_on_data_reset,
|
wait_on_data_reset,
|
||||||
proxy,
|
proxy,
|
||||||
|
@ -705,7 +705,9 @@ async def _setup_quote_stream(
|
||||||
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
|
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
|
||||||
def teardown():
|
def teardown():
|
||||||
ticker.updateEvent.disconnect(push)
|
ticker.updateEvent.disconnect(push)
|
||||||
log.error(f"Disconnected stream for `{symbol}`")
|
log.error(
|
||||||
|
f'Disconnected stream for `{symbol}`'
|
||||||
|
)
|
||||||
client.ib.cancelMktData(contract)
|
client.ib.cancelMktData(contract)
|
||||||
|
|
||||||
# decouple broadcast mem chan
|
# decouple broadcast mem chan
|
||||||
|
@ -761,7 +763,10 @@ async def open_aio_quote_stream(
|
||||||
symbol: str,
|
symbol: str,
|
||||||
contract: Contract | None = None,
|
contract: Contract | None = None,
|
||||||
|
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> (
|
||||||
|
trio.abc.Channel| # iface
|
||||||
|
tractor.to_asyncio.LinkedTaskChannel # actually
|
||||||
|
):
|
||||||
|
|
||||||
from tractor.trionics import broadcast_receiver
|
from tractor.trionics import broadcast_receiver
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
@ -778,6 +783,7 @@ async def open_aio_quote_stream(
|
||||||
yield from_aio
|
yield from_aio
|
||||||
return
|
return
|
||||||
|
|
||||||
|
from_aio: tractor.to_asyncio.LinkedTaskChannel
|
||||||
async with tractor.to_asyncio.open_channel_from(
|
async with tractor.to_asyncio.open_channel_from(
|
||||||
_setup_quote_stream,
|
_setup_quote_stream,
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
|
@ -983,17 +989,18 @@ async def stream_quotes(
|
||||||
)
|
)
|
||||||
cs: trio.CancelScope | None = None
|
cs: trio.CancelScope | None = None
|
||||||
startup: bool = True
|
startup: bool = True
|
||||||
|
iter_quotes: trio.abc.Channel
|
||||||
while (
|
while (
|
||||||
startup
|
startup
|
||||||
or cs.cancel_called
|
or cs.cancel_called
|
||||||
):
|
):
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as nurse,
|
trio.open_nursery() as tn,
|
||||||
open_aio_quote_stream(
|
open_aio_quote_stream(
|
||||||
symbol=sym,
|
symbol=sym,
|
||||||
contract=con,
|
contract=con,
|
||||||
) as stream,
|
) as iter_quotes,
|
||||||
):
|
):
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
# (ahem, ib_insync is stateful trash)
|
# (ahem, ib_insync is stateful trash)
|
||||||
|
@ -1021,9 +1028,9 @@ async def stream_quotes(
|
||||||
await rt_ev.wait()
|
await rt_ev.wait()
|
||||||
cs.cancel() # cancel called should now be set
|
cs.cancel() # cancel called should now be set
|
||||||
|
|
||||||
nurse.start_soon(reset_on_feed)
|
tn.start_soon(reset_on_feed)
|
||||||
|
|
||||||
async with aclosing(stream):
|
async with aclosing(iter_quotes):
|
||||||
# if syminfo.get('no_vlm', False):
|
# if syminfo.get('no_vlm', False):
|
||||||
if not init_msg.shm_write_opts['has_vlm']:
|
if not init_msg.shm_write_opts['has_vlm']:
|
||||||
|
|
||||||
|
@ -1038,19 +1045,21 @@ async def stream_quotes(
|
||||||
# wait for real volume on feed (trading might be
|
# wait for real volume on feed (trading might be
|
||||||
# closed)
|
# closed)
|
||||||
while True:
|
while True:
|
||||||
ticker = await stream.receive()
|
ticker = await iter_quotes.receive()
|
||||||
|
|
||||||
# for a real volume contract we rait for
|
# for a real volume contract we rait for
|
||||||
# the first "real" trade to take place
|
# the first "real" trade to take place
|
||||||
if (
|
if (
|
||||||
# not calc_price
|
# not calc_price
|
||||||
# and not ticker.rtTime
|
# and not ticker.rtTime
|
||||||
not ticker.rtTime
|
False
|
||||||
|
# not ticker.rtTime
|
||||||
):
|
):
|
||||||
# spin consuming tickers until we
|
# spin consuming tickers until we
|
||||||
# get a real market datum
|
# get a real market datum
|
||||||
log.debug(f"New unsent ticker: {ticker}")
|
log.debug(f"New unsent ticker: {ticker}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.debug("Received first volume tick")
|
log.debug("Received first volume tick")
|
||||||
# ugh, clear ticks since we've
|
# ugh, clear ticks since we've
|
||||||
|
@ -1066,13 +1075,18 @@ async def stream_quotes(
|
||||||
log.debug(f"First ticker received {quote}")
|
log.debug(f"First ticker received {quote}")
|
||||||
|
|
||||||
# tell data-layer spawner-caller that live
|
# tell data-layer spawner-caller that live
|
||||||
# quotes are now streaming.
|
# quotes are now active desptie not having
|
||||||
|
# necessarily received a first vlm/clearing
|
||||||
|
# tick.
|
||||||
|
ticker = await iter_quotes.receive()
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
fqme: str = quote['fqme']
|
||||||
|
await send_chan.send({fqme: quote})
|
||||||
|
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
async for ticker in stream:
|
async for ticker in iter_quotes:
|
||||||
quote = normalize(ticker)
|
quote = normalize(ticker)
|
||||||
fqme = quote['fqme']
|
fqme: str = quote['fqme']
|
||||||
await send_chan.send({fqme: quote})
|
await send_chan.send({fqme: quote})
|
||||||
|
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
|
|
|
@ -544,7 +544,7 @@ async def open_trade_dialog(
|
||||||
# to be reloaded.
|
# to be reloaded.
|
||||||
balances: dict[str, float] = await client.get_balances()
|
balances: dict[str, float] = await client.get_balances()
|
||||||
|
|
||||||
verify_balances(
|
await verify_balances(
|
||||||
acnt,
|
acnt,
|
||||||
src_fiat,
|
src_fiat,
|
||||||
balances,
|
balances,
|
||||||
|
|
|
@ -37,6 +37,12 @@ import tractor
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import wrapt
|
import wrapt
|
||||||
|
|
||||||
|
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
|
||||||
|
# writing a proper ws-api streamer for this backend (since the data
|
||||||
|
# feeds are free now) as per GH feat-req:
|
||||||
|
# https://github.com/pikers/piker/issues/509
|
||||||
|
#
|
||||||
import asks
|
import asks
|
||||||
|
|
||||||
from ..calc import humanize, percent_change
|
from ..calc import humanize, percent_change
|
||||||
|
|
Loading…
Reference in New Issue