diff --git a/lbry/error/README.md b/lbry/error/README.md index cc5ab7a3b5..2c747408ed 100644 --- a/lbry/error/README.md +++ b/lbry/error/README.md @@ -81,8 +81,8 @@ Code | Name | Message 511 | CorruptBlob | Blobs is corrupted. 520 | BlobFailedEncryption | Failed to encrypt blob. 531 | DownloadCancelled | Download was canceled. -532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout. -533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout. +532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout. +533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout. 534 | InvalidStreamDescriptor | {message} 535 | InvalidData | {message} 536 | InvalidBlobHash | {message} diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index 7e18f5bf98..88886487eb 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -411,18 +411,18 @@ def __init__(self): super().__init__("Download was canceled.") -class DownloadSDTimeoutError(BlobError): +class DownloadMetadataTimeoutError(BlobError): def __init__(self, download): self.download = download - super().__init__(f"Failed to download sd blob {download} within timeout.") + super().__init__(f"Failed to download metadata for {download} within timeout.") class DownloadDataTimeoutError(BlobError): def __init__(self, download): self.download = download - super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.") + super().__init__(f"Failed to download data blobs for {download} within timeout.") class InvalidStreamDescriptorError(BlobError): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 5d16812bff..f6f725b327 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -36,7 +36,7 @@ from lbry.blob_exchange.downloader import download_blob from lbry.dht.peer import make_kademlia_peer from lbry.error import ( - DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, + DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError, ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError, InputValueError @@ -639,7 +639,7 @@ async def handle_stream_get_request(self, request: web.Request): stream = await self.jsonrpc_get(uri) if isinstance(stream, dict): raise web.HTTPServerError(text=stream['error']) - raise web.HTTPFound(f"/stream/{stream.sd_hash}") + raise web.HTTPFound(f"/stream/{stream.identifier}") async def handle_stream_range_request(self, request: web.Request): try: @@ -658,12 +658,13 @@ async def handle_stream_range_request(self, request: web.Request): log.debug("finished handling /stream range request") async def _handle_stream_range_request(self, request: web.Request): - sd_hash = request.path.split("/stream/")[1] + identifier = request.path.split("/stream/")[1] if not self.file_manager.started.is_set(): await self.file_manager.started.wait() - if sd_hash not in self.file_manager.streams: + stream = self.file_manager.get_filtered(identifier=identifier) + if not stream: return web.HTTPNotFound() - return await self.file_manager.stream_partial_content(request, sd_hash) + return await self.file_manager.stream_partial_content(request, identifier) async def _process_rpc_call(self, data): args = data.get('params', {}) @@ -1139,7 +1140,7 @@ async def jsonrpc_get( save_file=save_file, wallet=wallet ) if not stream: - raise DownloadSDTimeoutError(uri) + raise DownloadMetadataTimeoutError(uri) except Exception as e: # TODO: use error from lbry.error log.warning("Error downloading %s: %s", uri, str(e)) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index 75eacdb9ad..7b5f60840b 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -285,7 +285,7 @@ def encode_file(self, managed_stream): else: total_bytes_lower_bound = total_bytes = managed_stream.torrent_length result = { - 'streaming_url': None, + 'streaming_url': managed_stream.stream_url, 'completed': managed_stream.completed, 'file_name': None, 'download_directory': None, @@ -293,10 +293,10 @@ def encode_file(self, managed_stream): 'points_paid': 0.0, 'stopped': not managed_stream.running, 'stream_hash': None, - 'stream_name': None, - 'suggested_file_name': None, + 'stream_name': managed_stream.stream_name, + 'suggested_file_name': managed_stream.suggested_file_name, 'sd_hash': None, - 'mime_type': None, + 'mime_type': managed_stream.mime_type, 'key': None, 'total_bytes_lower_bound': total_bytes_lower_bound, 'total_bytes': total_bytes, @@ -326,12 +326,8 @@ def encode_file(self, managed_stream): } if is_stream: result.update({ - 'streaming_url': managed_stream.stream_url, 'stream_hash': managed_stream.stream_hash, - 'stream_name': managed_stream.stream_name, - 'suggested_file_name': managed_stream.suggested_file_name, 'sd_hash': managed_stream.descriptor.sd_hash, - 'mime_type': managed_stream.mime_type, 'key': managed_stream.descriptor.key, 'blobs_completed': managed_stream.blobs_completed, 'blobs_in_stream': managed_stream.blobs_in_stream, @@ -340,10 +336,6 @@ def encode_file(self, managed_stream): 'reflector_progress': managed_stream.reflector_progress, 'uploading_to_reflector': managed_stream.uploading_to_reflector }) - else: - result.update({ - 'streaming_url': f'file://{managed_stream.full_path}', - }) if output_exists: result.update({ 'file_name': managed_stream.file_name, diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 81b4263dc3..65e7cfcd19 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -5,6 +5,7 @@ import asyncio import binascii import time +from operator import itemgetter from typing import Optional from lbry.wallet import SQLiteMixin from lbry.conf import Config @@ -211,7 +212,7 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str): transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall() -def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str], +def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], data_payment_rate: float, status: str, content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int: if not file_name and not download_directory: @@ -219,15 +220,18 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ else: encoded_file_name = binascii.hexlify(file_name.encode()).decode() encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() + is_torrent = len(identifier_value) == 40 time_added = added_on or int(time.time()) transaction.execute( - "insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)", - (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, + f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)", + (identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added) ).fetchall() - return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] + return transaction.execute( + f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?", + (identifier_value, )).fetchone()[0] class SQLiteStorage(SQLiteMixin): @@ -632,6 +636,13 @@ def update_db_removed(transaction: sqlite3.Connection, removed): def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: return self.db.run(get_all_lbry_files) + async def get_all_torrent_files(self) -> typing.List[typing.Dict]: + def _get_all_torrent_files(transaction): + cursor = transaction.execute( + "select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash") + return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall()) + return list(await self.db.run(_get_all_torrent_files)) + def change_file_status(self, stream_hash: str, new_status: str): log.debug("update file status %s -> %s", stream_hash, new_status) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash)) @@ -872,15 +883,20 @@ async def save_content_claim(self, stream_hash, claim_outpoint): if stream_hash in self.content_claim_callbacks: await self.content_claim_callbacks[stream_hash]() - async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name): - def _save_torrent(transaction): + async def add_torrent(self, bt_infohash, length, name): + def _save_torrent(transaction, bt_infohash, length, name): transaction.execute( "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) ).fetchall() + return await self.db.run(_save_torrent, bt_infohash, length, name) + + async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name): + def _save_torrent_claim(transaction): transaction.execute( "insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint) ).fetchall() - await self.db.run(_save_torrent) + await self.add_torrent(bt_infohash, length, name) + await self.db.run(_save_torrent_claim) # update corresponding ManagedEncryptedFileDownloader object if bt_infohash in self.content_claim_callbacks: await self.content_claim_callbacks[bt_infohash]() @@ -898,7 +914,7 @@ async def get_content_claim(self, stream_hash: str, include_supports: typing.Opt async def get_content_claim_for_torrent(self, bt_infohash): claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash]) - return claims[bt_infohash].as_dict() if claims else None + return claims[bt_infohash] if claims else None # # # # # # # # # reflector functions # # # # # # # # # diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 6ab9439070..d2f7dc399a 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -3,7 +3,7 @@ import typing from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError +from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.error import InvalidStreamURLError from lbry.stream.managed_stream import ManagedStream @@ -139,7 +139,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name ) claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier) - existing[0].set_claim(claim_info, claim) + existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim) else: await self.storage.save_content_claim( existing[0].stream_hash, outpoint @@ -242,15 +242,15 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag stream.identifier, outpoint, stream.torrent_length, stream.torrent_name ) claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) - stream.set_claim(claim_info, claim) + stream.set_claim(claim_info.as_dict() if claim_info else None, claim) if save_file: await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download)) return stream except asyncio.TimeoutError: - error = DownloadDataTimeoutError(stream.sd_hash) + error = DownloadDataTimeoutError(stream.identifier) raise error except Exception as err: # forgive data timeout, don't delete stream - expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, + expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError) if isinstance(err, expected): log.warning("Failed to download %s: %s", uri, str(err)) @@ -290,19 +290,24 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag ) ) - async def stream_partial_content(self, request: Request, sd_hash: str): - return await self.source_managers['stream'].stream_partial_content(request, sd_hash) + async def stream_partial_content(self, request: Request, identifier: str): + for source_manager in self.source_managers.values(): + if source_manager.get_filtered(identifier=identifier): + return await source_manager.stream_partial_content(request, identifier) def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: """ - Get a list of filtered and sorted ManagedStream objects - - :param sort_by: field to sort by - :param reverse: reverse sorting - :param comparison: comparison operator used for filtering - :param search_by: fields and values to filter by + Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers """ - return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) + result = last_error = None + for manager in self.source_managers.values(): + try: + result = (result or []) + manager.get_filtered(*args, **kwargs) + except ValueError as error: + last_error = error + if result is not None: + return result + raise last_error async def delete(self, source: ManagedDownloadSource, delete_file=False): for manager in self.source_managers.values(): diff --git a/lbry/file/source.py b/lbry/file/source.py index ba5bb311f4..63ddc6a40f 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -1,5 +1,6 @@ import os import asyncio +import time import typing import logging import binascii @@ -43,7 +44,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: ' self.rowid = rowid self.content_fee = content_fee self.purchase_receipt = None - self._added_on = added_on + self._added_on = added_on or int(time.time()) self.analytics_manager = analytics_manager self.downloader = None @@ -91,6 +92,14 @@ def file_name(self) -> Optional[str]: def added_on(self) -> Optional[int]: return self._added_on + @property + def suggested_file_name(self): + return self._file_name + + @property + def stream_name(self): + return self.suggested_file_name + @property def status(self) -> str: return self._status @@ -99,9 +108,9 @@ def status(self) -> str: def completed(self): raise NotImplementedError() - # @property - # def stream_url(self): - # return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash} + @property + def stream_url(self): + return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}" @property def finished(self) -> bool: diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 72c1709dd8..980f582d2b 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -23,6 +23,7 @@ class SourceManager: filter_fields = { + 'identifier', 'rowid', 'status', 'file_name', @@ -83,6 +84,7 @@ async def create(self, file_path: str, key: Optional[bytes] = None, raise NotImplementedError() async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + await self.storage.delete_torrent(source.identifier) self.remove(source) if delete_file and source.output_file_exists: os.remove(source.full_path) diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 39e24b37e9..cf79239121 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -4,7 +4,7 @@ import binascii from lbry.dht.node import get_kademlia_peers_from_hosts -from lbry.error import DownloadSDTimeoutError +from lbry.error import DownloadMetadataTimeoutError from lbry.utils import lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader @@ -77,7 +77,7 @@ async def load_descriptor(self, connection_id: int = 0): log.info("downloaded sd blob %s", self.sd_hash) self.time_to_descriptor = self.loop.time() - now except asyncio.TimeoutError: - raise DownloadSDTimeoutError(self.sd_hash) + raise DownloadMetadataTimeoutError(self.sd_hash) # parse the descriptor self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index a6be77ce48..3c57cefd58 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -5,7 +5,7 @@ import logging from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable -from lbry.error import DownloadSDTimeoutError +from lbry.error import DownloadMetadataTimeoutError from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name @@ -104,10 +104,6 @@ def written_bytes(self) -> int: def completed(self): return self.written_bytes >= self.descriptor.lower_bound_decrypted_length() - @property - def stream_url(self): - return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}" - async def update_status(self, status: str): assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] self._status = status @@ -164,7 +160,7 @@ async def start(self, timeout: Optional[float] = None, await asyncio.wait_for(self.downloader.start(), timeout) except asyncio.TimeoutError: self._running.clear() - raise DownloadSDTimeoutError(self.sd_hash) + raise DownloadMetadataTimeoutError(self.identifier) if self.delayed_stop_task and not self.delayed_stop_task.done(): self.delayed_stop_task.cancel() diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 7ecf7e442d..1fda2e4631 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]: class StreamManager(SourceManager): _sources: typing.Dict[str, ManagedStream] - filter_fields = SourceManager.filter_fields + filter_fields = set(SourceManager.filter_fields) filter_fields.update({ 'sd_hash', 'stream_hash', diff --git a/lbry/testcase.py b/lbry/testcase.py index 3ddaec4592..15a2d1806e 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -394,6 +394,7 @@ async def asyncSetUp(self): logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY) logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY) logging.getLogger('lbry.stream').setLevel(self.VERBOSITY) + logging.getLogger('lbry.torrent').setLevel(self.VERBOSITY) logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY) await super().asyncSetUp() diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 713d820392..abdbff27b5 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -3,9 +3,8 @@ import os import logging import random -from hashlib import sha1 from tempfile import mkdtemp -from typing import Optional +from typing import Optional, Tuple, Dict import libtorrent @@ -14,6 +13,8 @@ DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? libtorrent.add_torrent_params_flags_t.flag_auto_managed | libtorrent.add_torrent_params_flags_t.flag_update_subscribe + | libtorrent.add_torrent_params_flags_t.flag_sequential_download + | libtorrent.add_torrent_params_flags_t.flag_paused ) @@ -22,66 +23,102 @@ def __init__(self, loop, executor, handle): self._loop = loop self._executor = executor self._handle: libtorrent.torrent_handle = handle - self.started = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) - self.size = 0 + self.size = handle.status().total_wanted self.total_wanted_done = 0 self.name = '' self.tasks = [] - self.torrent_file: Optional[libtorrent.file_storage] = None + self._torrent_info: libtorrent.torrent_info = handle.torrent_file() self._base_path = None - self._handle.set_sequential_download(1) @property - def largest_file(self) -> Optional[str]: - if not self.torrent_file: + def torrent_file(self) -> Optional[libtorrent.file_storage]: + return self._torrent_info.files() + + def full_path_at(self, file_num) -> Optional[str]: + if self.torrent_file is None: return None - index = self.largest_file_index - return os.path.join(self._base_path, self.torrent_file.at(index).path) + return os.path.join(self.save_path, self.torrent_file.file_path(file_num)) + + def size_at(self, file_num) -> Optional[int]: + if self.torrent_file is not None: + return self.torrent_file.file_size(file_num) @property - def largest_file_index(self): - largest_size, index = 0, 0 + def save_path(self) -> Optional[str]: + if not self._base_path: + self._base_path = self._handle.status().save_path + return self._base_path + + def index_from_name(self, file_name): for file_num in range(self.torrent_file.num_files()): - if self.torrent_file.file_size(file_num) > largest_size: - largest_size = self.torrent_file.file_size(file_num) - index = file_num - return index + if '.pad' in self.torrent_file.file_path(file_num): + continue # ignore padding files + if file_name == os.path.basename(self.full_path_at(file_num)): + return file_num def stop_tasks(self): + self._handle.save_resume_data() while self.tasks: self.tasks.pop().cancel() + def byte_range_to_piece_range( + self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]: + start_piece = self._torrent_info.map_file(file_index, start_offset, 0) + end_piece = self._torrent_info.map_file(file_index, end_offset, 0) + return start_piece, end_piece + + async def stream_range_as_completed(self, file_name, start, end): + file_index = self.index_from_name(file_name) + if file_index is None: + raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}") + first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) + start_piece_offset = first_piece.start + piece_size = self._torrent_info.piece_length() + log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s", + first_piece.piece, final_piece.piece, start, end, piece_size, self.name) + self.prioritize(file_index, start, end) + for piece_index in range(first_piece.piece, final_piece.piece + 1): + while not self._handle.have_piece(piece_index): + log.info("Waiting for piece %d: %s", piece_index, self.name) + self._handle.set_piece_deadline(piece_index, 0) + await asyncio.sleep(0.2) + log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name) + yield piece_size - start_piece_offset + def _show_status(self): # fixme: cleanup if not self._handle.is_valid(): return status = self._handle.status() + self._base_path = status.save_path if status.has_metadata: self.size = status.total_wanted self.total_wanted_done = status.total_wanted_done self.name = status.name if not self.metadata_completed.is_set(): self.metadata_completed.set() + self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) - self.torrent_file = self._handle.get_torrent_info().files() - self._base_path = status.save_path - first_piece = self.torrent_file.at(self.largest_file_index).offset - if not self.started.is_set(): - if self._handle.have_piece(first_piece): - self.started.set() - else: - # prioritize it - self._handle.set_piece_deadline(first_piece, 100) - if not status.is_seeding: - log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', - status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, - status.num_peers, status.num_seeds, status.state, status.save_path) - elif not self.finished.is_set(): + log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', + status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, + status.num_peers, status.num_seeds, status.state, status.save_path) + if (status.is_finished or status.is_seeding) and not self.finished.is_set(): self.finished.set() log.info("Torrent finished: %s", self.name) + def prioritize(self, file_index, start, end, cleanup=False): + first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end) + priorities = self._handle.get_piece_priorities() + priorities = [0 if cleanup else 1 for _ in priorities] + self._handle.clear_piece_deadlines() + for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece)): + priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1 + self._handle.set_piece_deadline(piece_number, idx) + log.debug("Prioritizing pieces for %s: %s", self.name, priorities) + self._handle.prioritize_pieces(priorities) + async def status_loop(self): while True: self._show_status() @@ -105,19 +142,21 @@ def __init__(self, loop, executor): self._loop = loop self._executor = executor self._session: Optional[libtorrent.session] = None - self._handles = {} + self._handles: Dict[str, TorrentHandle] = {} self.tasks = [] - self.wait_start = True - async def add_fake_torrent(self): + def add_peer(self, btih, addr, port): + self._handles[btih]._handle.connect_peer((addr, port)) + + async def add_fake_torrent(self, file_count=3): tmpdir = mkdtemp() - info, btih = _create_fake_torrent(tmpdir) + info = _create_fake_torrent(tmpdir, file_count=file_count) flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode handle = self._session.add_torrent({ 'ti': info, 'save_path': tmpdir, 'flags': flags }) - self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) - return btih + self._handles[str(info.info_hash())] = TorrentHandle(self._loop, self._executor, handle) + return str(info.info_hash()) async def bind(self, interface: str = '0.0.0.0', port: int = 10889): settings = { @@ -131,15 +170,14 @@ async def bind(self, interface: str = '0.0.0.0', port: int = 10889): self.tasks.append(self._loop.create_task(self.process_alerts())) def stop(self): + while self._handles: + self._handles.popitem()[1].stop_tasks() while self.tasks: self.tasks.pop().cancel() - self._session.save_state() - self._session.pause() - self._session.stop_dht() - self._session.stop_lsd() - self._session.stop_natpmp() - self._session.stop_upnp() - self._session = None + if self._session: + self._session.save_state() + self._session.pause() + self._session = None def _pop_alerts(self): for alert in self._session.pop_alerts(): @@ -173,18 +211,23 @@ def _add_torrent(self, btih: str, download_directory: Optional[str]): handle.force_dht_announce() self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) - def full_path(self, btih): - return self._handles[btih].largest_file + def full_path(self, btih, file_num) -> Optional[str]: + return self._handles[btih].full_path_at(file_num) + + def save_path(self, btih): + return self._handles[btih].save_path + + def has_torrent(self, btih): + return btih in self._handles async def add_torrent(self, btih, download_path): + if btih in self._handles: + return await self._handles[btih].metadata_completed.wait() await self._loop.run_in_executor( self._executor, self._add_torrent, btih, download_path ) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) await self._handles[btih].metadata_completed.wait() - if self.wait_start: - # fixme: temporary until we add streaming support, otherwise playback fails! - await self._handles[btih].started.wait() def remove_torrent(self, btih, remove_files=False): if btih in self._handles: @@ -197,9 +240,17 @@ async def save_file(self, btih, download_directory): handle = self._handles[btih] await handle.resume() - def get_size(self, btih): + def get_total_size(self, btih): return self._handles[btih].size + def get_index_from_name(self, btih, file_name): + return self._handles[btih].index_from_name(file_name) + + def get_size(self, btih, file_name) -> Optional[int]: + for (path, size) in self.get_files(btih).items(): + if os.path.basename(path) == file_name: + return size + def get_name(self, btih): return self._handles[btih].name @@ -209,23 +260,38 @@ def get_downloaded(self, btih): def is_completed(self, btih): return self._handles[btih].finished.is_set() + def stream_file(self, btih, file_name, start, end): + handle = self._handles[btih] + return handle.stream_range_as_completed(file_name, start, end) + + def get_files(self, btih) -> Dict: + handle = self._handles[btih] + return { + self.full_path(btih, file_num): handle.torrent_file.file_size(file_num) + for file_num in range(handle.torrent_file.num_files()) + if '.pad' not in handle.torrent_file.file_path(file_num) + } + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" -def _create_fake_torrent(tmpdir): - # beware, that's just for testing - path = os.path.join(tmpdir, 'tmp') - with open(path, 'wb') as myfile: - size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024) +def _create_fake_torrent(tmpdir, file_count=3, largest_index=1): + # layout: subdir/tmp{0..file_count-1} files. v1+v2. automatic piece size. + # largest_index: which file index {0 ... file_count} will be the largest file file_storage = libtorrent.file_storage() - file_storage.add_file('tmp', size) - t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024) + subfolder = os.path.join(tmpdir, "subdir") + os.mkdir(subfolder) + for file_number in range(file_count): + file_name = f"tmp{file_number}" + with open(os.path.join(subfolder, file_name), 'wb') as myfile: + size = myfile.write( + bytes([random.randint(0, 255) for _ in range(10 - abs(file_number - largest_index))]) * 1024) + file_storage.add_file(os.path.join("subdir", file_name), size) + t = libtorrent.create_torrent(file_storage, 0, 0) libtorrent.set_piece_hashes(t, tmpdir) - info = libtorrent.torrent_info(t.generate()) - btih = sha1(info.metadata()).hexdigest() - return info, btih + return libtorrent.torrent_info(t.generate()) async def main(): @@ -238,17 +304,16 @@ async def main(): executor = None session = TorrentSession(asyncio.get_event_loop(), executor) - session2 = TorrentSession(asyncio.get_event_loop(), executor) - await session.bind('localhost', port=4040) - await session2.bind('localhost', port=4041) - btih = await session.add_fake_torrent() - session2._session.add_dht_node(('localhost', 4040)) - await session2.add_torrent(btih, "/tmp/down") + await session.bind() + await session.add_torrent(btih, os.path.expanduser("~/Downloads")) while True: - await asyncio.sleep(100) + session.full_path(btih, 0) + await asyncio.sleep(1) await session.pause() executor.shutdown() if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") + log = logging.getLogger(__name__) asyncio.run(main()) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index cf9106731b..8357c5033c 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -1,12 +1,14 @@ import asyncio -import binascii import logging import os import typing from typing import Optional -from aiohttp.web import Request +from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable + +from lbry.error import DownloadMetadataTimeoutError from lbry.file.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource +from lbry.schema.mime_types import guess_media_type if typing.TYPE_CHECKING: from lbry.torrent.session import TorrentSession @@ -19,12 +21,6 @@ log = logging.getLogger(__name__) -def path_or_none(encoded_path) -> Optional[str]: - if not encoded_path: - return - return binascii.unhexlify(encoded_path).decode() - - class TorrentSource(ManagedDownloadSource): STATUS_STOPPED = "stopped" filter_fields = SourceManager.filter_fields @@ -42,15 +38,55 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: ' super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, rowid, content_fee, analytics_manager, added_on) self.torrent_session = torrent_session + self._suggested_file_name = None + self._full_path = None @property def full_path(self) -> Optional[str]: - full_path = self.torrent_session.full_path(self.identifier) - self.download_directory = os.path.dirname(full_path) - return full_path + if not self._full_path: + self._full_path = self.select_path() + self._file_name = os.path.basename(self._full_path) + self.download_directory = self.torrent_session.save_path(self.identifier) + return self._full_path + + def select_path(self): + wanted_name = (self.stream_claim_info.claim.stream.source.name or '') if self.stream_claim_info else '' + wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name) + if wanted_index is None: + # maybe warn? + largest = (None, -1) + for (path, size) in self.torrent_session.get_files(self.identifier).items(): + largest = (path, size) if size > largest[1] else largest + return largest[0] + else: + return self.torrent_session.full_path(self.identifier, wanted_index or 0) + + @property + def suggested_file_name(self): + self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path()) + return self._suggested_file_name + + @property + def mime_type(self) -> Optional[str]: + return guess_media_type(os.path.basename(self.full_path))[0] + + async def setup(self, timeout: Optional[float] = None): + try: + metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory) + await asyncio.wait_for(metadata_download, timeout, loop=self.loop) + except asyncio.TimeoutError: + self.torrent_session.remove_torrent(btih=self.identifier) + raise DownloadMetadataTimeoutError(self.identifier) + self.download_directory = self.torrent_session.save_path(self.identifier) + self._file_name = os.path.basename(self.full_path) async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): - await self.torrent_session.add_torrent(self.identifier, self.download_directory) + await self.setup(timeout) + if not self.rowid: + await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) + self.rowid = await self.storage.save_downloaded_file( + self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on + ) async def stop(self, finished: bool = False): await self.torrent_session.remove_torrent(self.identifier) @@ -60,7 +96,11 @@ async def save_file(self, file_name: Optional[str] = None, download_directory: O @property def torrent_length(self): - return self.torrent_session.get_size(self.identifier) + return self.torrent_session.get_total_size(self.identifier) + + @property + def stream_length(self): + return self.torrent_session.get_size(self.identifier, self.file_name) @property def written_bytes(self): @@ -81,6 +121,56 @@ def stop_tasks(self): def completed(self): return self.torrent_session.is_completed(self.identifier) + @property + def status(self): + return self.STATUS_FINISHED if self.completed else self.STATUS_RUNNING + + async def stream_file(self, request): + log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, + self.identifier[:6]) + headers, start, end = self._prepare_range_response_headers( + request.headers.get('range', 'bytes=0-') + ) + target = self.suggested_file_name + await self.start() + response = StreamResponse( + status=206, + headers=headers + ) + await response.prepare(request) + while not os.path.exists(self.full_path): + async for _ in self.torrent_session.stream_file(self.identifier, target, start, end): + break + with open(self.full_path, 'rb') as infile: + infile.seek(start) + async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end): + if infile.tell() + read_size < end: + await response.write(infile.read(read_size)) + else: + await response.write_eof(infile.read(end - infile.tell() + 1)) + return response + + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: + if '=' in get_range: + get_range = get_range.split('=')[1] + start, end = get_range.split('-') + size = self.stream_length + + start = int(start) + end = int(end) if end else size - 1 + + if end >= size or not 0 <= start < size: + raise HTTPRequestRangeNotSatisfiable() + + final_size = end - start + 1 + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{size}', + 'Content-Length': str(final_size), + 'Content-Type': self.mime_type + } + return headers, start, end + class TorrentManager(SourceManager): _sources: typing.Dict[str, ManagedDownloadSource] @@ -103,7 +193,7 @@ async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str], download_directory: Optional[str], status: str, claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], - added_on: Optional[int]): + added_on: Optional[int], **kwargs): stream = TorrentSource( self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name, download_directory=download_directory, status=status, claim=claim, rowid=rowid, @@ -111,9 +201,14 @@ async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[s torrent_session=self.torrent_session ) self.add(stream) + await stream.setup() async def initialize_from_database(self): - pass + for file in await self.storage.get_all_torrent_files(): + claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash']) + file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None + file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None + await self._load_stream(claim=claim, **file) async def start(self): await super().start() @@ -132,9 +227,6 @@ async def create(self, file_path: str, key: Optional[bytes] = None, async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): raise NotImplementedError - # blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]] - # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) - # await self.storage.delete_stream(source.descriptor) - async def stream_partial_content(self, request: Request, sd_hash: str): - raise NotImplementedError + async def stream_partial_content(self, request: Request, identifier: str): + return await self._sources[identifier].stream_file(request) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 95e92ce1e9..5b672e7c28 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -1,14 +1,18 @@ +import time import unittest from unittest import skipIf import asyncio import os from binascii import hexlify +import aiohttp.web + from lbry.schema import Claim from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT +from lbry.utils import aiohttp_request from lbry.wallet import Transaction from lbry.torrent.tracker import UDPTrackerServerProtocol @@ -17,55 +21,104 @@ class FileCommands(CommandTestCase): def __init__(self, *a, **kw): super().__init__(*a, **kw) self.skip_libtorrent = False + self.streaming_port = 60818 + self.seeder_session = None - async def add_forever(self): - while True: - for handle in self.client_session._handles.values(): - handle._handle.connect_peer(('127.0.0.1', 4040)) - await asyncio.sleep(.1) - - async def initialize_torrent(self, tx_to_update=None): - if not hasattr(self, 'seeder_session'): + async def initialize_torrent(self, tx_to_update=None, pick_a_file=True, name=None): + assert name is None or tx_to_update is None + if not self.seeder_session: self.seeder_session = TorrentSession(self.loop, None) self.addCleanup(self.seeder_session.stop) await self.seeder_session.bind('127.0.0.1', port=4040) - btih = await self.seeder_session.add_fake_torrent() + btih = await self.seeder_session.add_fake_torrent(file_count=3) + files = [(size, path) for (path, size) in self.seeder_session.get_files(btih).items()] + files.sort() + # picking a file will pick something in the middle, while automatic selection will pick largest + self.expected_size, self.expected_path = files[1] if pick_a_file else files[-1] + address = await self.account.receiving.get_or_create_usable_address() + claim = tx_to_update.outputs[0].claim if tx_to_update else Claim() + claim.stream.update(bt_infohash=btih) + if pick_a_file: + claim.stream.source.name = os.path.basename(self.expected_path) if not tx_to_update: - claim = Claim() - claim.stream.update(bt_infohash=btih) tx = await Transaction.claim_create( - 'torrent', claim, 1, address, [self.account], self.account + name or 'torrent', claim, 1, address, [self.account], self.account ) else: - claim = tx_to_update.outputs[0].claim - claim.stream.update(bt_infohash=btih) tx = await Transaction.claim_update( tx_to_update.outputs[0], claim, 1, address, [self.account], self.account ) await tx.sign([self.account]) await self.broadcast_and_confirm(tx) self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session - self.client_session.wait_start = False # fixme: this is super slow on tests - task = asyncio.create_task(self.add_forever()) - self.addCleanup(task.cancel) return tx, btih + async def assert_torrent_streaming_works(self, btih): + url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/stream/{btih}' + if self.daemon.streaming_runner.server is None: + await self.daemon.streaming_runner.setup() + site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, + self.streaming_port) + await site.start() + async with aiohttp_request('get', url) as req: + self.assertEqual(req.status, 206) + self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') + content_range = req.headers.get('Content-Range') + content_length = int(req.headers.get('Content-Length')) + streamed_bytes = await req.content.read() + expected_size = self.expected_size + self.assertEqual(expected_size, len(streamed_bytes)) + self.assertEqual(content_length, len(streamed_bytes)) + self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range) + @skipIf(TorrentSession is None, "libtorrent not installed") async def test_download_torrent(self): - tx, btih = await self.initialize_torrent() + tx, btih = await self.initialize_torrent(pick_a_file=False) self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) # second call, see its there and move on self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) - self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih) self.assertIn(btih, self.client_session._handles) + + # stream over streaming API (full range of the largest file) + await self.assert_torrent_streaming_works(btih) + + # check json encoder fields for torrent sources + file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] + self.assertEqual(btih, file['metadata']['source']['bt_infohash']) + self.assertAlmostEqual(time.time(), file['added_on'], delta=12) + self.assertEqual("application/octet-stream", file['mime_type']) + self.assertEqual(os.path.basename(self.expected_path), file['suggested_file_name']) + self.assertEqual(os.path.basename(self.expected_path), file['stream_name']) + while not file['completed']: # improve that + await asyncio.sleep(0.5) + file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] + self.assertTrue(file['completed']) + self.assertGreater(file['total_bytes_lower_bound'], 0) + self.assertEqual(file['total_bytes_lower_bound'], file['total_bytes']) + self.assertEqual(file['total_bytes'], file['written_bytes']) + self.assertEqual('finished', file['status']) + + # filter by a field which is missing on torrent + self.assertItemCount(await self.daemon.jsonrpc_file_list(stream_hash="abc"), 0) + tx, new_btih = await self.initialize_torrent(tx) self.assertNotEqual(btih, new_btih) # claim now points to another torrent, update to it self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + + # restart and verify that only one updated stream was recovered + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() + self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + # check it was saved properly, once + self.assertEqual(1, len(await self.daemon.storage.get_all_torrent_files())) + self.assertIn(new_btih, self.client_session._handles) self.assertNotIn(btih, self.client_session._handles) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) @@ -73,6 +126,11 @@ async def test_download_torrent(self): self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) self.assertNotIn(new_btih, self.client_session._handles) + await self.initialize_torrent(name='torrent2') + self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent2'))) + file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] + self.assertEqual(os.path.basename(self.expected_path), file['stream_name']) + async def create_streams_in_range(self, *args, **kwargs): self.stream_claim_ids = [] for i in range(*args, **kwargs): @@ -335,12 +393,12 @@ async def test_download_different_timeouts(self): await self.server.blob_manager.delete_blobs(all_except_sd) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) - self.assertEqual('Failed to download data blobs for sd hash %s within timeout.' % sd_hash, resp['error']) + self.assertEqual('Failed to download data blobs for %s within timeout.' % sd_hash, resp['error']) self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file") await self.server.blob_manager.delete_blobs([sd_hash]) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) - self.assertEqual('Failed to download sd blob %s within timeout.' % sd_hash, resp['error']) + self.assertEqual('Failed to download metadata for %s within timeout.' % sd_hash, resp['error']) async def wait_files_to_complete(self): while await self.file_list(status='running'): diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index ba6d8dbc82..48db7cf55f 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -11,7 +11,7 @@ from lbry.testcase import get_fake_exchange_rate_manager from lbry.utils import generate_id from lbry.error import InsufficientFundsError -from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadSDTimeoutError, DownloadDataTimeoutError +from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadMetadataTimeoutError, DownloadDataTimeoutError from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database from lbry.wallet.constants import CENT, NULL_HASH32 from lbry.wallet.network import ClientSession @@ -229,10 +229,10 @@ def check_post(event): self.assertFalse(event['properties']['added_fixed_peers']) self.assertEqual(event['properties']['connection_failures_count'], 1) self.assertEqual( - event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.' + event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.' ) - await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError, after_setup=after_setup) + await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup) async def test_override_fixed_peer_delay_dht_disabled(self): self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)] @@ -266,18 +266,18 @@ async def test_no_peers_timeout(self): def check_post(event): self.assertEqual(event['event'], 'Time To First Bytes') - self.assertEqual(event['properties']['error'], 'DownloadSDTimeoutError') + self.assertEqual(event['properties']['error'], 'DownloadMetadataTimeoutError') self.assertEqual(event['properties']['tried_peers_count'], 0) self.assertEqual(event['properties']['active_peer_count'], 0) self.assertFalse(event['properties']['use_fixed_peers']) self.assertFalse(event['properties']['added_fixed_peers']) self.assertIsNone(event['properties']['fixed_peer_delay']) self.assertEqual( - event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.' + event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.' ) start = self.loop.time() - await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError) + await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError) duration = self.loop.time() - start self.assertLessEqual(duration, 5) self.assertGreaterEqual(duration, 3.0) @@ -387,7 +387,7 @@ async def test_download_sd_timeout(self): self.server.stop_server() await self.setup_stream_manager() await self._test_download_error_analytics_on_start( - DownloadSDTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1 + DownloadMetadataTimeoutError, f'Failed to download metadata for {self.sd_hash} within timeout.', timeout=1 ) async def test_download_data_timeout(self): @@ -396,7 +396,7 @@ async def test_download_data_timeout(self): head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash'] self.server_blob_manager.delete_blob(head_blob_hash) await self._test_download_error_analytics_on_start( - DownloadDataTimeoutError, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout.', timeout=1 + DownloadDataTimeoutError, f'Failed to download data blobs for {self.sd_hash} within timeout.', timeout=1 ) async def test_unexpected_error(self):