Skip to content

Commit

Permalink
Implement bandwidth limiting for barman-cloud-backup
Browse files Browse the repository at this point in the history
Limits the average bandwidth used by `barman-cloud-backup` when backing
up to either AWS S3 or Azure Blob Storage according to the value set by
a new CLI option `--max-bandwidth`.

This is implemented by throttling the rate at which chunks are enqueued
for upload. Rate limiting is applied by calculating the minimum time
before a new chunk can be uploaded based on the size of the last
uploaded chunk and the requested maximum bandwidth. The remaining time
to wait is then calculated from this value and the elapsed time since
the last upload. If the minimum time has already elapsed then no waiting
occurs.

The throttling happens just before a new part is uploaded so that any
time spent compressing data or flushing the buffer to disk has already
happened before we calculate how long we must wait.

If `--max-bandwidth` is not set then no throttling is applied.

Because we throttle the upload rate in the parent rather than the
workers, it is not guaranteed that we will always stay under
`--max-bandwidth`. For example, if there is a period of network
saturation which allows queued chunks to accumulate on disk, when that
saturation clears the workers will upload the queued chunks as fast as
they possibly can. The size of this spike will be limited by the size of
the queue and, given the queue size is currently limited to 2 * the
number of workers, the practical impact of such a spike will be low.

Note that because bandwidth limiting is implemented for multipart
uploads and backups to Google Cloud Storage do not currently use
multipart uploads, the `--max-bandwidth` option will have no effect when
used with `--cloud-provider=google-cloud-storage`.

Closes BAR-114.
  • Loading branch information
mikewallace1979 committed Oct 10, 2023
1 parent b0b8789 commit 6e9a7f9
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 22 deletions.
8 changes: 8 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def main(args=None):
"compression": config.compression,
"max_archive_size": config.max_archive_size,
"min_chunk_size": config.min_chunk_size,
"max_bandwidth": config.max_bandwidth,
"cloud_interface": cloud_interface,
}
if __is_hook_script():
Expand Down Expand Up @@ -322,6 +323,13 @@ def parse_arguments(args=None):
"google-cloud-storage)",
default=None, # Defer to the cloud interface if nothing is specified
)
parser.add_argument(
"--max-bandwidth",
type=check_size,
help="the maximum amount of data to be uploaded per second when backing up to "
"either AWS S3 or Azure Blob Storage (default: no limit)",
default=None,
)
parser.add_argument(
"-d",
"--dbname",
Expand Down
57 changes: 56 additions & 1 deletion barman/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import shutil
import signal
import tarfile
import time
from abc import ABCMeta, abstractmethod, abstractproperty
from functools import partial
from io import BytesIO, RawIOBase
Expand Down Expand Up @@ -171,18 +172,23 @@ class CloudTarUploader(object):
NamedTemporaryFile, delete=False, prefix="barman-upload-", suffix=".part"
)

def __init__(self, cloud_interface, key, chunk_size, compression=None):
def __init__(
self, cloud_interface, key, chunk_size, compression=None, max_bandwidth=None
):
"""
A tar archive that resides on cloud storage
:param CloudInterface cloud_interface: cloud interface instance
:param str key: path inside the bucket
:param str compression: required compression
:param int chunk_size: the upload chunk size
:param int max_bandwidth: the maximum amount of data per second that
should be uploaded by this tar uploader
"""
self.cloud_interface = cloud_interface
self.key = key
self.chunk_size = chunk_size
self.max_bandwidth = max_bandwidth
self.upload_metadata = None
self.buffer = None
self.counter = 0
Expand All @@ -201,6 +207,8 @@ def __init__(self, cloud_interface, key, chunk_size, compression=None):
)
self.size = 0
self.stats = None
self.time_of_last_upload = None
self.size_of_last_upload = None

def write(self, buf):
if self.buffer and self.buffer.tell() > self.chunk_size:
Expand All @@ -219,15 +227,47 @@ def write(self, buf):
self.buffer.write(buf)
self.size += len(buf)

def _throttle_upload(self, part_size):
"""
Throttles the upload according to the value of `self.max_bandwidth`.
Waits until enough time has passed since the last upload that a new part can
be uploaded without exceeding `self.max_bandwidth`. If sufficient time has
already passed then this function will return without waiting.
:param int part_size: Size in bytes of the part which is to be uplaoded.
"""
if (self.time_of_last_upload and self.size_of_last_upload) is not None:
min_time_to_next_upload = self.size_of_last_upload / self.max_bandwidth
seconds_since_last_upload = (
datetime.datetime.now() - self.time_of_last_upload
).total_seconds()
if seconds_since_last_upload < min_time_to_next_upload:
logging.info(
f"Uploaded {self.size_of_last_upload} bytes "
f"{seconds_since_last_upload} seconds ago which exceeds "
f"limit of {self.max_bandwidth} bytes/s"
)
time_to_wait = min_time_to_next_upload - seconds_since_last_upload
logging.info(f"Throttling upload by waiting for {time_to_wait} seconds")
time.sleep(time_to_wait)
self.time_of_last_upload = datetime.datetime.now()
self.size_of_last_upload = part_size

def flush(self):
if not self.upload_metadata:
self.upload_metadata = self.cloud_interface.create_multipart_upload(
self.key
)

part_size = self.buffer.tell()
self.buffer.flush()
self.buffer.seek(0, os.SEEK_SET)
self.counter += 1
if self.max_bandwidth:
# Upload throttling is applied just before uploading the next part so that
# compression and flushing have already happened before we start waiting.
self._throttle_upload(part_size)
self.cloud_interface.async_upload_part(
upload_metadata=self.upload_metadata,
key=self.key,
Expand Down Expand Up @@ -257,6 +297,7 @@ def __init__(
max_archive_size,
compression,
min_chunk_size=None,
max_bandwidth=None,
):
"""
Create a new controller that upload the backup in cloud storage
Expand All @@ -266,6 +307,8 @@ def __init__(
:param int max_archive_size: the maximum size of an archive
:param str|None compression: required compression
:param int|None min_chunk_size: the minimum size of a single upload part
:param int|None max_bandwidth: the maximum amount of data per second that
should be uploaded during the backup
"""

self.cloud_interface = cloud_interface
Expand Down Expand Up @@ -294,6 +337,7 @@ def __init__(
possible_min_chunk_sizes.append(min_chunk_size)
self.chunk_size = max(possible_min_chunk_sizes)
self.compression = compression
self.max_bandwidth = max_bandwidth
self.tar_list = {}

self.upload_stats = {}
Expand Down Expand Up @@ -338,6 +382,7 @@ def _get_tar(self, name):
key=os.path.join(self.key_prefix, self._build_dest_name(name)),
chunk_size=self.chunk_size,
compression=self.compression,
max_bandwidth=self.max_bandwidth,
)
]
# If the current uploading file size is over DEFAULT_MAX_TAR_SIZE
Expand All @@ -353,6 +398,7 @@ def _get_tar(self, name):
),
chunk_size=self.chunk_size,
compression=self.compression,
max_bandwidth=self.max_bandwidth,
)
self.tar_list[name].append(uploader)
return uploader.tar
Expand Down Expand Up @@ -1484,6 +1530,7 @@ def __init__(
compression=None,
backup_name=None,
min_chunk_size=None,
max_bandwidth=None,
):
"""
Base constructor.
Expand All @@ -1498,6 +1545,8 @@ def __init__(
:param str|None backup_name: A friendly name which can be used to reference
this backup in the future.
:param int min_chunk_size: the minimum size of a single upload part
:param int max_bandwidth: the maximum amount of data per second that should
be uploaded during the backup
"""
super(CloudBackupUploader, self).__init__(
server_name,
Expand All @@ -1509,6 +1558,7 @@ def __init__(
self.compression = compression
self.max_archive_size = max_archive_size
self.min_chunk_size = min_chunk_size
self.max_bandwidth = max_bandwidth

# Object properties set at backup time
self.controller = None
Expand Down Expand Up @@ -1549,6 +1599,7 @@ def _create_upload_controller(self, backup_id):
self.max_archive_size,
self.compression,
self.min_chunk_size,
self.max_bandwidth,
)

def _backup_data_files(
Expand Down Expand Up @@ -1745,6 +1796,7 @@ def __init__(
backup_id,
compression=None,
min_chunk_size=None,
max_bandwidth=None,
):
"""
Create the cloud storage upload client for a backup in the specified
Expand All @@ -1759,6 +1811,8 @@ def __init__(
:param str backup_id: The id of the backup to upload
:param str compression: Compression algorithm to use
:param int min_chunk_size: the minimum size of a single upload part
:param int max_bandwidth: the maximum amount of data per second that
should be uploaded during the backup
"""
super(CloudBackupUploaderBarman, self).__init__(
server_name,
Expand All @@ -1767,6 +1821,7 @@ def __init__(
compression=compression,
postgres=None,
min_chunk_size=min_chunk_size,
max_bandwidth=max_bandwidth,
)
self.backup_dir = backup_dir
self.backup_id = backup_id
Expand Down
7 changes: 6 additions & 1 deletion doc/barman-cloud-backup.1
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ usage:\ barman\-cloud\-backup\ [\-V]\ [\-\-help]\ [\-v\ |\ \-q]\ [\-t]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-\-immediate\-checkpoint]\ [\-J\ JOBS]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-S\ MAX_ARCHIVE_SIZE]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-\-min\-chunk\-size\ MIN_CHUNK_SIZE]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-d\ DBNAME]\ [\-n\ BACKUP_NAME]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-\-max\-bandwidth\ MAX_BANDWIDTH]\ [\-d\ DBNAME]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-n\ BACKUP_NAME]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-\-snapshot\-instance\ SNAPSHOT_INSTANCE]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-\-snapshot\-disk\ NAME]\ [\-\-snapshot\-zone\ GCP_ZONE]
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ [\-\-snapshot\-gcp\-project\ GCP_PROJECT]
Expand Down Expand Up @@ -106,6 +107,10 @@ optional\ arguments:
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ cloud\ storage\ (default:\ 5MB\ for\ aws\-s3,\ 64KB\ for
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ azure\-blob\-storage,\ not\ applicable\ for\ google\-cloud\-
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ storage)
\ \ \-\-max\-bandwidth\ MAX_BANDWIDTH
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ the\ maximum\ amount\ of\ data\ to\ be\ uploaded\ per\ second
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ when\ backing\ up\ to\ either\ AWS\ S3\ or\ Azure\ Blob\ Storage
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ (default:\ no\ limit)
\ \ \-d\ DBNAME,\ \-\-dbname\ DBNAME
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ Database\ name\ or\ conninfo\ string\ for\ Postgres
\ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ connection\ (default:\ postgres)
Expand Down
7 changes: 6 additions & 1 deletion doc/barman-cloud-backup.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ usage: barman-cloud-backup [-V] [--help] [-v | -q] [-t]
[--immediate-checkpoint] [-J JOBS]
[-S MAX_ARCHIVE_SIZE]
[--min-chunk-size MIN_CHUNK_SIZE]
[-d DBNAME] [-n BACKUP_NAME]
[--max-bandwidth MAX_BANDWIDTH] [-d DBNAME]
[-n BACKUP_NAME]
[--snapshot-instance SNAPSHOT_INSTANCE]
[--snapshot-disk NAME] [--snapshot-zone GCP_ZONE]
[--snapshot-gcp-project GCP_PROJECT]
Expand Down Expand Up @@ -103,6 +104,10 @@ optional arguments:
cloud storage (default: 5MB for aws-s3, 64KB for
azure-blob-storage, not applicable for google-cloud-
storage)
--max-bandwidth MAX_BANDWIDTH
the maximum amount of data to be uploaded per second
when backing up to either AWS S3 or Azure Blob Storage
(default: no limit)
-d DBNAME, --dbname DBNAME
Database name or conninfo string for Postgres
connection (default: postgres)
Expand Down
67 changes: 57 additions & 10 deletions tests/test_barman_cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,33 @@ class TestCloudBackup(object):
"""

@pytest.mark.parametrize(
("barman_cloud_args", "expected_max_archive_size", "expected_min_chunk_size"),
(
([], 100 << 30, None),
(["--max-archive-size=10GB"], 10 << 30, None),
(["--min-chunk-size=50MB"], 100 << 30, 50 << 20),
(["--max-archive-size=10GB", "--min-chunk-size=50MB"], 10 << 30, 50 << 20),
"barman_cloud_args",
"expected_max_archive_size",
"expected_min_chunk_size",
"expected_max_bandwidth",
),
(
([], 100 << 30, None, None),
(["--max-archive-size=10GB"], 10 << 30, None, None),
(["--min-chunk-size=50MB"], 100 << 30, 50 << 20, None),
(
["--max-archive-size=10GB", "--min-chunk-size=50MB"],
10 << 30,
50 << 20,
None,
),
(["--max-bandwidth=80MB"], 100 << 30, None, 80 << 20),
(
[
"--max-archive-size=10GB",
"--min-chunk-size=50MB",
"--max-bandwidth=80MB",
],
10 << 30,
50 << 20,
80 << 20,
),
),
)
@mock.patch.dict(
Expand All @@ -58,6 +79,7 @@ def test_uses_postgres_backup_uploader(
barman_cloud_args,
expected_max_archive_size,
expected_min_chunk_size,
expected_max_bandwidth,
):
uploader = uploader_mock.return_value
cloud_backup.main(["cloud_storage_url", "test_server"] + barman_cloud_args)
Expand All @@ -70,6 +92,7 @@ def test_uses_postgres_backup_uploader(
postgres=postgres_connection.return_value,
max_archive_size=expected_max_archive_size,
min_chunk_size=expected_min_chunk_size,
max_bandwidth=expected_max_bandwidth,
cloud_interface=cloud_interface_mock.return_value,
)
uploader.backup.assert_called_once()
Expand Down Expand Up @@ -424,12 +447,33 @@ class TestCloudBackupHookScript(object):
"""

@pytest.mark.parametrize(
("barman_cloud_args", "expected_max_archive_size", "expected_min_chunk_size"),
(
([], 100 << 30, None),
(["--max-archive-size=10GB"], 10 << 30, None),
(["--min-chunk-size=50MB"], 100 << 30, 50 << 20),
(["--max-archive-size=10GB", "--min-chunk-size=50MB"], 10 << 30, 50 << 20),
"barman_cloud_args",
"expected_max_archive_size",
"expected_min_chunk_size",
"expected_max_bandwidth",
),
(
([], 100 << 30, None, None),
(["--max-archive-size=10GB"], 10 << 30, None, None),
(["--min-chunk-size=50MB"], 100 << 30, 50 << 20, None),
(
["--max-archive-size=10GB", "--min-chunk-size=50MB"],
10 << 30,
50 << 20,
None,
),
(["--max-bandwidth=80MB"], 100 << 30, None, 80 << 20),
(
[
"--max-archive-size=10GB",
"--min-chunk-size=50MB",
"--max-bandwidth=80MB",
],
10 << 30,
50 << 20,
80 << 20,
),
),
)
@mock.patch.dict(
Expand All @@ -454,6 +498,7 @@ def test_uses_barman_backup_uploader_when_running_as_hook(
barman_cloud_args,
expected_max_archive_size,
expected_min_chunk_size,
expected_max_bandwidth,
):
uploader = uploader_mock.return_value
cloud_backup.main(["cloud_storage_url", "test_server"] + barman_cloud_args)
Expand All @@ -463,6 +508,7 @@ def test_uses_barman_backup_uploader_when_running_as_hook(
compression=None,
max_archive_size=expected_max_archive_size,
min_chunk_size=expected_min_chunk_size,
max_bandwidth=expected_max_bandwidth,
cloud_interface=cloud_interface_mock.return_value,
backup_dir=EXAMPLE_BACKUP_DIR,
backup_id=EXAMPLE_BACKUP_ID,
Expand Down Expand Up @@ -497,6 +543,7 @@ def test_uses_barman_backup_uploader_when_running_as_retry_hook(
compression=None,
max_archive_size=107374182400,
min_chunk_size=None,
max_bandwidth=None,
cloud_interface=cloud_interface_mock.return_value,
backup_dir=EXAMPLE_BACKUP_DIR,
backup_id=EXAMPLE_BACKUP_ID,
Expand Down
Loading

0 comments on commit 6e9a7f9

Please sign in to comment.