-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
StreamFetcher POC #6459
base: develop
Are you sure you want to change the base?
StreamFetcher POC #6459
Changes from 6 commits
aeeae61
1286f07
459bb70
38f4316
58e8ada
8d2c4ee
6159c6e
795e91f
cb9b445
846f63e
23ef701
7d9f34a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,7 @@ | ||||||||||||
"""Crypto Router.""" | ||||||||||||
|
||||||||||||
# import asyncio | ||||||||||||
|
||||||||||||
from openbb_core.app.model.command_context import CommandContext | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
from openbb_core.app.model.example import APIEx | ||||||||||||
from openbb_core.app.model.obbject import OBBject | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# pylint: disable=W0613:unused-argument | ||
"""Crypto Price Router.""" | ||
|
||
from fastapi.responses import StreamingResponse | ||
from openbb_core.app.model.command_context import CommandContext | ||
from openbb_core.app.model.example import APIEx | ||
from openbb_core.app.model.obbject import OBBject | ||
|
@@ -11,6 +12,10 @@ | |
) | ||
from openbb_core.app.query import Query | ||
from openbb_core.app.router import Router | ||
from providers.binance.openbb_binance.models.crypto_historical import ( | ||
BinanceCryptoHistoricalData, | ||
BinanceCryptoHistoricalFetcher, | ||
) | ||
|
||
router = Router(prefix="/price") | ||
|
||
|
@@ -56,3 +61,15 @@ async def historical( | |
) -> OBBject: | ||
"""Get historical price data for cryptocurrency pair(s) within a provider.""" | ||
return await OBBject.from_query(Query(**locals())) | ||
|
||
|
||
@router.command(methods=["GET"]) | ||
async def live( | ||
symbol: str = "ethbtc", lifetime: int = 10 | ||
) -> BinanceCryptoHistoricalData: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can we make this a Provider Interface method? There could be any number of providers that have a WS connection, like here - https://site.financialmodelingprep.com/developer/docs#crypto-websocket |
||
"""Connect to Binance WebSocket Crypto Price data feed.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This endpoint will not generate any documentation or descriptions because it is not using the ProviderInterface. |
||
generator = BinanceCryptoHistoricalFetcher().stream_data( | ||
params={"symbol": symbol, "lifetime": lifetime}, | ||
credentials=None, | ||
) | ||
return StreamingResponse(generator, media_type="application/x-ndjson") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# OpenBB Biztoc Provider | ||
|
||
This extension integrates the Biztoc data provider | ||
into the OpenBB Platform. | ||
|
||
## Installation | ||
|
||
To install the extension, run the following command in this folder: | ||
|
||
```bash | ||
pip install openbb-biztoc | ||
``` | ||
|
||
Documentation available [here](https://docs.openbb.co/platform/development/contributing). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Biztoc provider.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
"""Biztoc provider module.""" | ||
|
||
# from openbb_binance.models.crypto_historical import ( | ||
# BinanceCryptoHistoricalFetcher, | ||
# ) | ||
from openbb_core.provider.abstract.provider import Provider | ||
|
||
binance_provider = Provider( | ||
name="binance", | ||
website="https://api.binance.com", | ||
description="""BizToc uses Rapid API for its REST API. | ||
You may sign up for your free account at https://rapidapi.com/thma/api/binance. | ||
|
||
The Base URL for all requests is: | ||
|
||
https://binance.p.rapidapi.com/ | ||
|
||
If you're not a developer but would still like to use Biztoc outside of the main website, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This description is half BizToc and half maybe Binance. |
||
we've partnered with OpenBB, allowing you to pull in BizToc's news stream in their Terminal.""", | ||
# credentials=["api_key"], | ||
fetcher_dict={ | ||
# "bcrypto_historical": BinanceCryptoHistoricalFetcher, | ||
}, | ||
repr_name="Binance", | ||
instructions="The BizToc API is hosted on RapidAPI. To set up, go to: https://rapidapi.com/thma/api/binance.\n\n![binance0](https://github.com/marban/OpenBBTerminal/assets/18151143/04cdd423-f65e-4ad8-ad5a-4a59b0f5ddda)\n\nIn the top right, select 'Sign Up'. After answering some questions, you will be prompted to select one of their plans.\n\n![binance1](https://github.com/marban/OpenBBTerminal/assets/18151143/9f3b72ea-ded7-48c5-aa33-bec5c0de8422)\n\nAfter signing up, navigate back to https://rapidapi.com/thma/api/binance. If you are logged in, you will see a header called X-RapidAPI-Key.\n\n![binance2](https://github.com/marban/OpenBBTerminal/assets/18151143/0f3b6c91-07e0-447a-90cd-a9e23522929f)", # noqa: E501 pylint: disable=line-too-long | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Biztoc Provider models.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""Binance Crypto Historical WS Data.""" | ||
|
||
import json | ||
import logging | ||
from datetime import datetime, timedelta | ||
from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional | ||
|
||
import websockets | ||
from openbb_core.provider.standard_models.crypto_historical import ( | ||
CryptoHistoricalData, | ||
CryptoHistoricalQueryParams, | ||
) | ||
from pydantic import Field | ||
|
||
from openbb_platform.core.openbb_core.provider.abstract.fetcher import Fetcher | ||
|
||
# pylint: disable=unused-argument, arguments-differ | ||
|
||
|
||
class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams): | ||
"""Binance Crypto Historical Query Params.""" | ||
|
||
lifetime: Optional[int] = Field( | ||
default=60, description="Lifetime of WebSocket in seconds." | ||
) | ||
|
||
|
||
class BinanceCryptoHistoricalData(CryptoHistoricalData): | ||
"""Binance Crypto Historical Data.""" | ||
|
||
__alias_dict__ = { | ||
"symbol": "s", | ||
"close": "c", | ||
"open": "o", | ||
"high": "h", | ||
"low": "l", | ||
"volume": "v", | ||
} | ||
event_type: Optional[str] = Field( | ||
default=None, | ||
description="Event type", | ||
alias="e", | ||
) | ||
quote_asset_volume: Optional[str] = Field( | ||
default=None, | ||
description="Total traded quote asset volume", | ||
alias="q", | ||
) | ||
|
||
|
||
class BinanceCryptoHistoricalFetcher(Fetcher): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this model name and Fetcher class is deceiving. It is not at all like the other provider/standard models with this name. This is not historical data, and the pattern itself is significantly different. It needs to be differentiated. |
||
"""Define Binance Crypto Historical Fetcher.""" | ||
|
||
@staticmethod | ||
def transform_query(params: Dict[str, Any]) -> BinanceCryptoHistoricalQueryParams: | ||
"""Transform the query params.""" | ||
return BinanceCryptoHistoricalQueryParams(**params) | ||
|
||
@staticmethod | ||
async def aextract_data( | ||
query: BinanceCryptoHistoricalQueryParams, | ||
credentials: Optional[Dict[str, str]] = None, | ||
**kwargs: Any, | ||
) -> AsyncGenerator[dict, None]: | ||
"""Return the raw data from the Binance endpoint.""" | ||
async with websockets.connect( | ||
f"wss://stream.binance.com:9443/ws/{query.symbol.lower()}@miniTicker" | ||
) as websocket: | ||
logging.info("Connected to WebSocket server.") | ||
end_time = datetime.now() + timedelta(seconds=query.lifetime) | ||
try: | ||
while datetime.now() < end_time: | ||
chunk = await websocket.recv() | ||
yield json.loads(chunk) | ||
except websockets.exceptions.ConnectionClosed as e: | ||
logging.error("WebSocket connection closed.") | ||
raise e | ||
finally: | ||
logging.info("WebSocket connection closed.") | ||
|
||
@staticmethod | ||
async def atransform_data( | ||
query: BinanceCryptoHistoricalQueryParams, | ||
data: Dict[str, Any], | ||
) -> AsyncIterator[str]: | ||
"""Return the transformed data.""" | ||
async for chunk in data: | ||
chunk["date"] = ( | ||
datetime.now().isoformat() if "date" not in chunk else chunk["date"] | ||
) | ||
result = BinanceCryptoHistoricalData(**chunk) | ||
yield result.model_dump_json() + "\n" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Biztoc utils.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a blocker: