`kraken`: heh, use `trio_util` for trades streamz tooo XD
							parent
							
								
									116f7fd40f
								
							
						
					
					
						commit
						49958e68ea
					
				| 
						 | 
				
			
			@ -34,7 +34,6 @@ from typing import (
 | 
			
		|||
    Union,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from async_generator import aclosing
 | 
			
		||||
from bidict import bidict
 | 
			
		||||
import pendulum
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -672,11 +671,9 @@ async def trades_dialogue(
 | 
			
		|||
                        token=token,
 | 
			
		||||
                    ),
 | 
			
		||||
                ) as ws,
 | 
			
		||||
                aclosing(stream_messages(ws)) as stream,
 | 
			
		||||
                stream_messages(ws) as stream,
 | 
			
		||||
                trio.open_nursery() as nurse,
 | 
			
		||||
            ):
 | 
			
		||||
                stream = stream_messages(ws)
 | 
			
		||||
 | 
			
		||||
                # task for processing inbound requests from ems
 | 
			
		||||
                nurse.start_soon(
 | 
			
		||||
                    handle_order_requests,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -78,6 +78,7 @@ class OHLC(Struct):
 | 
			
		|||
    ticks: list[Any] = []
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@trio_async_generator
 | 
			
		||||
async def stream_messages(
 | 
			
		||||
    ws: NoBsWs,
 | 
			
		||||
):
 | 
			
		||||
| 
						 | 
				
			
			@ -133,63 +134,64 @@ async def process_data_feed_msgs(
 | 
			
		|||
    Parse and pack data feed messages.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async for msg in stream_messages(ws):
 | 
			
		||||
        match msg:
 | 
			
		||||
            case {
 | 
			
		||||
                'errorMessage': errmsg
 | 
			
		||||
            }:
 | 
			
		||||
                raise BrokerError(errmsg)
 | 
			
		||||
    async with stream_messages(ws) as ws_stream:
 | 
			
		||||
        async for msg in ws_stream:
 | 
			
		||||
            match msg:
 | 
			
		||||
                case {
 | 
			
		||||
                    'errorMessage': errmsg
 | 
			
		||||
                }:
 | 
			
		||||
                    raise BrokerError(errmsg)
 | 
			
		||||
 | 
			
		||||
            case {
 | 
			
		||||
                'event': 'subscriptionStatus',
 | 
			
		||||
            } as sub:
 | 
			
		||||
                log.info(
 | 
			
		||||
                    'WS subscription is active:\n'
 | 
			
		||||
                    f'{sub}'
 | 
			
		||||
                )
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            case [
 | 
			
		||||
                chan_id,
 | 
			
		||||
                *payload_array,
 | 
			
		||||
                chan_name,
 | 
			
		||||
                pair
 | 
			
		||||
            ]:
 | 
			
		||||
                if 'ohlc' in chan_name:
 | 
			
		||||
                    ohlc = OHLC(
 | 
			
		||||
                        chan_id,
 | 
			
		||||
                        chan_name,
 | 
			
		||||
                        pair,
 | 
			
		||||
                        *payload_array[0]
 | 
			
		||||
                case {
 | 
			
		||||
                    'event': 'subscriptionStatus',
 | 
			
		||||
                } as sub:
 | 
			
		||||
                    log.info(
 | 
			
		||||
                        'WS subscription is active:\n'
 | 
			
		||||
                        f'{sub}'
 | 
			
		||||
                    )
 | 
			
		||||
                    ohlc.typecast()
 | 
			
		||||
                    yield 'ohlc', ohlc
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                elif 'spread' in chan_name:
 | 
			
		||||
                case [
 | 
			
		||||
                    chan_id,
 | 
			
		||||
                    *payload_array,
 | 
			
		||||
                    chan_name,
 | 
			
		||||
                    pair
 | 
			
		||||
                ]:
 | 
			
		||||
                    if 'ohlc' in chan_name:
 | 
			
		||||
                        ohlc = OHLC(
 | 
			
		||||
                            chan_id,
 | 
			
		||||
                            chan_name,
 | 
			
		||||
                            pair,
 | 
			
		||||
                            *payload_array[0]
 | 
			
		||||
                        )
 | 
			
		||||
                        ohlc.typecast()
 | 
			
		||||
                        yield 'ohlc', ohlc
 | 
			
		||||
 | 
			
		||||
                    bid, ask, ts, bsize, asize = map(
 | 
			
		||||
                        float, payload_array[0])
 | 
			
		||||
                    elif 'spread' in chan_name:
 | 
			
		||||
 | 
			
		||||
                    # TODO: really makes you think IB has a horrible API...
 | 
			
		||||
                    quote = {
 | 
			
		||||
                        'symbol': pair.replace('/', ''),
 | 
			
		||||
                        'ticks': [
 | 
			
		||||
                            {'type': 'bid', 'price': bid, 'size': bsize},
 | 
			
		||||
                            {'type': 'bsize', 'price': bid, 'size': bsize},
 | 
			
		||||
                        bid, ask, ts, bsize, asize = map(
 | 
			
		||||
                            float, payload_array[0])
 | 
			
		||||
 | 
			
		||||
                            {'type': 'ask', 'price': ask, 'size': asize},
 | 
			
		||||
                            {'type': 'asize', 'price': ask, 'size': asize},
 | 
			
		||||
                        ],
 | 
			
		||||
                    }
 | 
			
		||||
                    yield 'l1', quote
 | 
			
		||||
                        # TODO: really makes you think IB has a horrible API...
 | 
			
		||||
                        quote = {
 | 
			
		||||
                            'symbol': pair.replace('/', ''),
 | 
			
		||||
                            'ticks': [
 | 
			
		||||
                                {'type': 'bid', 'price': bid, 'size': bsize},
 | 
			
		||||
                                {'type': 'bsize', 'price': bid, 'size': bsize},
 | 
			
		||||
 | 
			
		||||
                # elif 'book' in msg[-2]:
 | 
			
		||||
                #     chan_id, *payload_array, chan_name, pair = msg
 | 
			
		||||
                #     print(msg)
 | 
			
		||||
                                {'type': 'ask', 'price': ask, 'size': asize},
 | 
			
		||||
                                {'type': 'asize', 'price': ask, 'size': asize},
 | 
			
		||||
                            ],
 | 
			
		||||
                        }
 | 
			
		||||
                        yield 'l1', quote
 | 
			
		||||
 | 
			
		||||
            case _:
 | 
			
		||||
                print(f'UNHANDLED MSG: {msg}')
 | 
			
		||||
                # yield msg
 | 
			
		||||
                    # elif 'book' in msg[-2]:
 | 
			
		||||
                    #     chan_id, *payload_array, chan_name, pair = msg
 | 
			
		||||
                    #     print(msg)
 | 
			
		||||
 | 
			
		||||
                case _:
 | 
			
		||||
                    print(f'UNHANDLED MSG: {msg}')
 | 
			
		||||
                    # yield msg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def normalize(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue