`kraken`: heh, use `trio_util` for trades streamz tooo XD

rekt_pps
Tyler Goodlet 2023-03-22 19:15:13 -04:00
parent c59ec77d9c
commit 59b095b2d5
2 changed files with 52 additions and 53 deletions

View File

@ -34,7 +34,6 @@ from typing import (
Union, Union,
) )
from async_generator import aclosing
from bidict import bidict from bidict import bidict
import pendulum import pendulum
import trio import trio
@ -672,11 +671,9 @@ async def trades_dialogue(
token=token, token=token,
), ),
) as ws, ) as ws,
aclosing(stream_messages(ws)) as stream, stream_messages(ws) as stream,
trio.open_nursery() as nurse, trio.open_nursery() as nurse,
): ):
stream = stream_messages(ws)
# task for processing inbound requests from ems # task for processing inbound requests from ems
nurse.start_soon( nurse.start_soon(
handle_order_requests, handle_order_requests,

View File

@ -78,6 +78,7 @@ class OHLC(Struct):
ticks: list[Any] = [] ticks: list[Any] = []
@trio_async_generator
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
): ):
@ -133,63 +134,64 @@ async def process_data_feed_msgs(
Parse and pack data feed messages. Parse and pack data feed messages.
''' '''
async for msg in stream_messages(ws): async with stream_messages(ws) as ws_stream:
match msg: async for msg in ws_stream:
case { match msg:
'errorMessage': errmsg case {
}: 'errorMessage': errmsg
raise BrokerError(errmsg) }:
raise BrokerError(errmsg)
case { case {
'event': 'subscriptionStatus', 'event': 'subscriptionStatus',
} as sub: } as sub:
log.info( log.info(
'WS subscription is active:\n' 'WS subscription is active:\n'
f'{sub}' f'{sub}'
)
continue
case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
ohlc = OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
) )
ohlc.typecast() continue
yield 'ohlc', ohlc
elif 'spread' in chan_name: case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
ohlc = OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
ohlc.typecast()
yield 'ohlc', ohlc
bid, ask, ts, bsize, asize = map( elif 'spread' in chan_name:
float, payload_array[0])
# TODO: really makes you think IB has a horrible API... bid, ask, ts, bsize, asize = map(
quote = { float, payload_array[0])
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize}, # TODO: really makes you think IB has a horrible API...
{'type': 'asize', 'price': ask, 'size': asize}, quote = {
], 'symbol': pair.replace('/', ''),
} 'ticks': [
yield 'l1', quote {'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
# elif 'book' in msg[-2]: {'type': 'ask', 'price': ask, 'size': asize},
# chan_id, *payload_array, chan_name, pair = msg {'type': 'asize', 'price': ask, 'size': asize},
# print(msg) ],
}
yield 'l1', quote
case _: # elif 'book' in msg[-2]:
print(f'UNHANDLED MSG: {msg}') # chan_id, *payload_array, chan_name, pair = msg
# yield msg # print(msg)
case _:
print(f'UNHANDLED MSG: {msg}')
# yield msg
def normalize( def normalize(