Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

what is the right way to use Backends without Feedhandler to store Exchange rest data? #1045

Open
keiser1080 opened this issue Jun 27, 2024 · 1 comment
Labels

Comments

@keiser1080
Copy link
Contributor

keiser1080 commented Jun 27, 2024

hi
i am trying to fetch historical exchange data,
and to use backend callback to store the data.

My code do the job but i am looking a cleaner way to do that.

import json
from datetime import datetime
import asyncio
from datetime import timedelta, UTC
from cryptofeed import FeedHandler
from cryptofeed.backends.quest import CandlesQuest
from cryptofeed.defines import CANDLES
from cryptofeed.exchanges import BinanceFutures
from backtest.config import settings


bf = BinanceFutures()
cq = CandlesQuest(host=settings.quest_host, port=settings.quest_port)
# async def candle_callback(c, receipt_timestamp):
#     print(f"Candle received at {receipt_timestamp}: {c}")


async def get_history(loop, start, end):
    cq.start(loop)
    cq.running
    async for data in bf.candles(
        "BTC-USDT-PERP",
        start,
        end,
    ):
        print("=================")

        for row in data:
            d = row.to_dict()
            d["receipt_timestamp"] = d["timestamp"] + 1
            await cq(row, d["receipt_timestamp"])
    await cq.stop()
    return


def main():
    config = {"log": {"filename": "demo.log", "level": "DEBUG", "disabled": False}}
    today = datetime.now(UTC)
    yesterday = datetime.now(UTC) - timedelta(minutes=24)
    start = yesterday.strftime("%Y-%m-%d %H:%M:%S")
    end = today.strftime("%Y-%m-%d %H:%M:%S")
    f = FeedHandler(config=config)
    f.run(start_loop=False)
    f.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT-PERP"],
            channels=[CANDLES],
            callbacks={
                CANDLES: CandlesQuest(
                    host=settings.quest_host, port=settings.quest_port
                )
            },
        )
    )
    loop = asyncio.get_event_loop()
    # loop.create_task(get_history(loop, start, end))
    loop.run_until_complete(get_history(loop, start, end))
    loop.run_forever()


if __name__ == "__main__":

    main()

It work until but it raise an exception when i stop the script "CTRL-C"

2024-06-27 17:06:56,495 : INFO : BINANCE_FUTURES: starting backend task CandlesQuest with multiprocessing=False
2024-06-27 17:06:56,497 : DEBUG : BINANCE_FUTURES.http.0: create HTTP session
2024-06-27 17:06:56,497 : DEBUG : BINANCE_FUTURES.http.0: requesting data from https://fapi.binance.com/fapi/v1/klines?symbol=BTCUSDT&interval=1m&limit=1000&startTime=1719520976000&endTime=1719522416000
2024-06-27 17:06:56,515 : DEBUG : BINANCE_FUTURES.ws.2: connecting to wss://fstream.binance.com/stream?streams=btcusdt@kline_1m
=================
^CTask was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<QuestCallback.writer() running at /home/me/project/backtest/.venv/lib/python3.12/site-packages/cryptofeed/backends/quest.py:27> wait_for=<Future pending cb=[Task.task_wakeup()]>>
backtest-py3.12```
@keiser1080 keiser1080 changed the title Wat is the rieght way to use Backends without Feedhandler to store Exchange rest data? Wat is the right way to use Backends without Feedhandler to store Exchange rest data? Jun 28, 2024
@keiser1080 keiser1080 changed the title Wat is the right way to use Backends without Feedhandler to store Exchange rest data? what is the right way to use Backends without Feedhandler to store Exchange rest data? Jun 28, 2024
@ljluestc
Copy link

import json
from datetime import datetime
import asyncio
from datetime import timedelta, UTC
from cryptofeed import FeedHandler
from cryptofeed.backends.quest import CandlesQuest
from cryptofeed.defines import CANDLES
from cryptofeed.exchanges import BinanceFutures
from backtest.config import settings

bf = BinanceFutures()
cq = CandlesQuest(host=settings.quest_host, port=settings.quest_port)

# Callback for handling the data and passing it to the backend
async def candle_callback(c, receipt_timestamp):
    print(f"Candle received at {receipt_timestamp}: {c}")
    # Insert your custom logic to store or process the candle data here

async def get_history(loop, start, end):
    try:
        # Start the CandlesQuest backend
        cq.start(loop)
        async for data in bf.candles("BTC-USDT-PERP", start, end):
            print("=================")
            for row in data:
                d = row.to_dict()
                d["receipt_timestamp"] = d["timestamp"] + 1
                await candle_callback(row, d["receipt_timestamp"])
    except asyncio.CancelledError:
        print("The task was cancelled")
    finally:
        await cq.stop()
    return

async def main():
    config = {"log": {"filename": "demo.log", "level": "DEBUG", "disabled": False}}
    today = datetime.now(UTC)
    yesterday = datetime.now(UTC) - timedelta(minutes=24)
    start = yesterday.strftime("%Y-%m-%d %H:%M:%S")
    end = today.strftime("%Y-%m-%d %H:%M:%S")

    # Initialize FeedHandler
    f = FeedHandler(config=config)
    f.run(start_loop=False)

    # Add the Binance feed with callbacks to CandlesQuest
    f.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT-PERP"],
            channels=[CANDLES],
            callbacks={CANDLES: candle_callback},
        )
    )

    # Create an asyncio event loop and run get_history asynchronously
    loop = asyncio.get_event_loop()
    try:
        # Start the get_history function
        await get_history(loop, start, end)
    except KeyboardInterrupt:
        print("Process interrupted, cancelling tasks...")
        # Gracefully shutdown by cancelling pending tasks
        for task in asyncio.all_tasks(loop):
            task.cancel()

if __name__ == "__main__":
    asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants