From 7a3cbd72f157e8c70b8ac7e2881157ae7fd65098 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Sep 2022 10:58:38 -0300 Subject: [PATCH] add tests for streaming, fix bugs --- lbry/torrent/session.py | 8 +++---- lbry/torrent/torrent_manager.py | 5 ++-- .../datanetwork/test_file_commands.py | 24 +++++++++++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 1f72640bbc..050fbadc90 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -26,7 +26,7 @@ def __init__(self, loop, executor, handle): self.started = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) - self.size = 0 + self.size = handle.status().total_wanted self.total_wanted_done = 0 self.name = '' self.tasks = [] @@ -70,10 +70,10 @@ def byte_range_to_piece_range( async def stream_range_as_completed(self, file_index, start, end): first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) - start_piece_offset = final_piece.start + start_piece_offset = first_piece.start piece_size = self._torrent_info.piece_length() - log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s", - first_piece.piece, final_piece.piece, start, end, self.name) + log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s", + first_piece.piece, final_piece.piece, start, end, piece_size, self.name) self.prioritize(file_index, start, end) await self.resume() for piece_index in range(first_piece.piece, final_piece.piece + 1): diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 76e4bc32e8..fc29bb7946 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -113,10 +113,11 @@ async def stream_file(self, request): with open(self.full_path, 'rb') as infile: infile.seek(start) async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): - if start + read_size < end: + if infile.tell() + read_size < end: await response.write(infile.read(read_size)) else: - await response.write_eof(infile.read(end - infile.tell())) + await response.write_eof(infile.read(end - infile.tell() + 1)) + return response def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: if '=' in get_range: diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 35d7046ef4..1e60d62a77 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -4,11 +4,14 @@ import os from binascii import hexlify +import aiohttp.web + from lbry.schema import Claim from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT +from lbry.utils import aiohttp_request from lbry.wallet import Transaction from lbry.torrent.tracker import UDPTrackerServerProtocol @@ -51,6 +54,23 @@ async def initialize_torrent(self, tx_to_update=None): self.addCleanup(task.cancel) return tx, btih + async def assert_torrent_streaming_works(self, btih): + url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/torrent' + if self.daemon.streaming_runner.server is None: + await self.daemon.streaming_runner.setup() + site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, + self.daemon.conf.streaming_port) + await site.start() + async with aiohttp_request('get', url) as req: + self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') + content_range = req.headers.get('Content-Range') + content_length = int(req.headers.get('Content-Length')) + streamed_bytes = await req.content.read() + expected_size = self.seeder_session.get_size(btih) + self.assertEqual(expected_size, len(streamed_bytes)) + self.assertEqual(content_length, len(streamed_bytes)) + self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range) + @skipIf(TorrentSession is None, "libtorrent not installed") async def test_download_torrent(self): tx, btih = await self.initialize_torrent() @@ -61,6 +81,10 @@ async def test_download_torrent(self): self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih) self.assertIn(btih, self.client_session._handles) + + # stream over streaming API (full range of the largest file) + await self.assert_torrent_streaming_works(btih) + tx, new_btih = await self.initialize_torrent(tx) self.assertNotEqual(btih, new_btih) # claim now points to another torrent, update to it