Use built-in type generics

pause_feeds_on_sym_switch
Tyler Goodlet 2021-09-02 12:55:10 -04:00
parent eb5762d912
commit d3838c2a8b
1 changed files with 13 additions and 13 deletions

View File

@ -26,7 +26,7 @@ from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from typing import ( from typing import (
List, Dict, Any, Tuple, Optional, Any, Optional,
AsyncIterator, Awaitable, AsyncIterator, Awaitable,
) )
import asyncio import asyncio
@ -226,8 +226,8 @@ class Client:
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
# contract cache # contract cache
self._contracts: Dict[str, Contract] = {} self._contracts: dict[str, Contract] = {}
self._feeds: Dict[str, trio.abc.SendChannel] = {} self._feeds: dict[str, trio.abc.SendChannel] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
@ -242,7 +242,7 @@ class Client:
period_count: int = int(2e3), # <- max per 1s sample query period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]: ) -> list[dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present. """Retreive OHLCV bars for a symbol over a range to the present.
""" """
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
@ -369,7 +369,7 @@ class Client:
upto: int = 3, upto: int = 3,
asdicts: bool = True, asdicts: bool = True,
) -> Dict[str, ContractDetails]: ) -> dict[str, ContractDetails]:
# TODO add search though our adhoc-locally defined symbol set # TODO add search though our adhoc-locally defined symbol set
# for futes/cmdtys/ # for futes/cmdtys/
@ -399,7 +399,7 @@ class Client:
# ``wrapper.starTicker()`` currently cashes ticker instances # ``wrapper.starTicker()`` currently cashes ticker instances
# which means getting a singel quote will potentially look up # which means getting a singel quote will potentially look up
# a quote for a ticker that it already streaming and thus run # a quote for a ticker that it already streaming and thus run
# into state clobbering (eg. List: Ticker.ticks). It probably # into state clobbering (eg. list: Ticker.ticks). It probably
# makes sense to try this once we get the pub-sub working on # makes sense to try this once we get the pub-sub working on
# individual symbols... # individual symbols...
@ -660,7 +660,7 @@ class Client:
async def positions( async def positions(
self, self,
account: str = '', account: str = '',
) -> List[Position]: ) -> list[Position]:
""" """
Retrieve position info for ``account``. Retrieve position info for ``account``.
""" """
@ -1067,12 +1067,12 @@ asset_type_map = {
} }
_quote_streams: Dict[str, trio.abc.ReceiveStream] = {} _quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream( async def _setup_quote_stream(
symbol: str, symbol: str,
opts: Tuple[int] = ('375', '233', '236'), opts: tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
@ -1148,13 +1148,13 @@ async def start_aio_quote_stream(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: list[str],
shm: ShmArray, shm: ShmArray,
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Stream symbol quotes. """Stream symbol quotes.
@ -1285,7 +1285,7 @@ async def stream_quotes(
# last = time.time() # last = time.time()
def pack_position(pos: Position) -> Dict[str, Any]: def pack_position(pos: Position) -> dict[str, Any]:
con = pos.contract con = pos.contract
if isinstance(con, Option): if isinstance(con, Option):
@ -1367,7 +1367,7 @@ async def trades_dialogue(
ctx: tractor.Context, ctx: tractor.Context,
loglevel: str = None, loglevel: str = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)