From a39d1fed01c1295dc3b36667ab3a0b0bb2055cf2 Mon Sep 17 00:00:00 2001 From: Bryant Moscon Date: Wed, 20 Nov 2024 19:10:42 -0500 Subject: [PATCH] Update to use new websockets version --- cryptofeed/connection.py | 9 +++++---- cryptofeed/connection_handler.py | 17 ----------------- examples/demo.py | 2 +- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/cryptofeed/connection.py b/cryptofeed/connection.py index 49f306099..b30570a07 100644 --- a/cryptofeed/connection.py +++ b/cryptofeed/connection.py @@ -16,7 +16,8 @@ from aiohttp.client_reqrep import ClientResponse import requests -import websockets +from websockets.asyncio.client import connect +from websockets.protocol import State import aiohttp from aiohttp.typedefs import StrOrURL from yapic import json as json_parser @@ -303,7 +304,7 @@ def __init__(self, address: str, conn_id: str, authentication=None, subscription @property def is_open(self) -> bool: - return self.conn and not self.conn.closed + return self.conn and not self.conn.state == State.CLOSED async def _open(self): if self.is_open: @@ -315,7 +316,7 @@ async def _open(self): if self.authentication: self.address, self.ws_kwargs = await self.authentication(self.address, self.ws_kwargs) - self.conn = await websockets.connect(self.address, **self.ws_kwargs) + self.conn = await connect(self.address, **self.ws_kwargs) self.sent = 0 self.received = 0 self.last_message = None @@ -357,7 +358,7 @@ class WebsocketEndpoint: authentication: bool = None def __post_init__(self): - defaults = {'ping_interval': 10, 'ping_timeout': None, 'max_size': 2**23, 'max_queue': None, 'read_limit': 2**18} + defaults = {'ping_interval': 10, 'ping_timeout': None, 'max_size': None, 'max_queue': None} if self.options: defaults.update(self.options) self.options = defaults diff --git a/cryptofeed/connection_handler.py b/cryptofeed/connection_handler.py index 2460b10aa..9f1965c80 100644 --- a/cryptofeed/connection_handler.py +++ b/cryptofeed/connection_handler.py @@ -13,7 +13,6 @@ import zlib from websockets import ConnectionClosed -from websockets.exceptions import InvalidStatusCode from cryptofeed.connection import AsyncConnection from cryptofeed.exceptions import ExhaustedRetries @@ -77,22 +76,6 @@ async def _create_connection(self): await asyncio.sleep(delay) retries += 1 delay *= 2 - except InvalidStatusCode as e: - if self.exceptions: - for ex in self.exceptions: - if isinstance(e, ex): - LOG.warning("%s: encountered exception %s, which is on the ignore list. Raising", self.conn.uuid, str(e)) - raise - if e.status_code == 429: - rand = random.uniform(1.0, 3.0) - LOG.warning("%s: Rate Limited - waiting %d seconds to reconnect", self.conn.uuid, (rate_limited * 60 * rand)) - await asyncio.sleep(rate_limited * 60 * rand) - rate_limited += 1 - else: - LOG.warning("%s: encountered connection issue %s - reconnecting in %.1f seconds...", self.conn.uuid, str(e), delay, exc_info=True) - await asyncio.sleep(delay) - retries += 1 - delay *= 2 except Exception as e: if self.exceptions: for ex in self.exceptions: diff --git a/examples/demo.py b/examples/demo.py index 0d3e4d7a7..659045e6d 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -106,7 +106,7 @@ def main(): f.add_feed(HitBTC(channels=[TRADES], symbols=['BTC-USDT'], callbacks={TRADES: trade})) f.add_feed(Huobi(symbols=['BTC-USDT'], channels=[CANDLES, TRADES, L2_BOOK], callbacks={TRADES: trade, L2_BOOK: book, CANDLES: candle_callback})) f.add_feed(HuobiDM(subscription={L2_BOOK: HuobiDM.symbols()[:2], TRADES: HuobiDM.symbols()[:10]}, callbacks={TRADES: trade, L2_BOOK: book})) - pairs = ['BTC-USD-PERP', 'ETH-USD-PERP', 'EOS-USD-PERP', 'BCH-USD-PERP', 'BSV-USD-PERP', 'LTC-USD-PERP'] + pairs = ['BTC-USD-PERP', 'ETH-USD-PERP', 'LTC-USD-PERP'] f.add_feed(HuobiSwap(symbols=pairs, channels=[TRADES, L2_BOOK, FUNDING], callbacks={FUNDING: funding, TRADES: trade, L2_BOOK: book})) f.add_feed(KrakenFutures(symbols=KrakenFutures.symbols(), channels=[L2_BOOK, TICKER, TRADES, OPEN_INTEREST, FUNDING], callbacks={L2_BOOK: book, FUNDING: funding, OPEN_INTEREST: oi, TRADES: trade, TICKER: ticker})) f.add_feed(Kraken(config='config.yaml', checksum_validation=True, subscription={L2_BOOK: ['BTC-USD'], TRADES: ['BTC-USD'], CANDLES: ['BTC-USD'], TICKER: ['ETH-USD']}, callbacks={L2_BOOK: book, CANDLES: candle_callback, TRADES: trade, TICKER: ticker}))