Skip to content

Commit

Permalink
Add more tests for the rhsm library module and ScanSubscriptionManage…
Browse files Browse the repository at this point in the history
…rInfo actor. (oamg#692)

Further extends the test suite for the `rhsm` library module with tests that assert
the functionality of all library functions used when obtaining `RHSMInfo` from the
system.

Also adds minimalistic tests for the `ScanSubscriptionManagerInfo` actor since
almost all of its functionality is provided by the `rhsm` library module.

Co-authored-by: MichalHe <[email protected]>
  • Loading branch information
MichalHe and MichalHe authored Jul 26, 2021
1 parent 28d5178 commit 7f1bf06
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest

from leapp.libraries.actor import scanrhsm
from leapp.libraries.common import rhsm
from leapp.libraries.common.testutils import CurrentActorMocked, produce_mocked
from leapp.libraries.stdlib import api
from leapp.models import RepositoryData, RepositoryFile, RHSMInfo


def mocked_get_rhsm_info(context):
assert context, 'The actor did not provide library with valid context.'
info = RHSMInfo()
info.attached_skus = ['SKU1', 'SKU2']
info.available_repos = ['Repo1', 'Repo2']
info.enabled_repos = ['Repo2']
info.release = '7.9'
info.existing_product_certificates = ['Cert1', 'Cert2', 'Cert3']
info.sca_detected = True
return info


def test_scansubscriptionmanagementinfo(monkeypatch):
actor_producs = produce_mocked()

monkeypatch.setattr(rhsm, 'scan_rhsm_info', mocked_get_rhsm_info)
monkeypatch.setattr(api, 'produce', actor_producs)
monkeypatch.setattr(api, 'current_actor', CurrentActorMocked())

scanrhsm.scan()

assert actor_producs.model_instances, 'The actor did not produce any message.'
assert len(actor_producs.model_instances) == 1, 'The actor produced more messages than expected.'

message = actor_producs.model_instances[0]

# The actor does not do much more than calling the `rhsm` library (which has its own tests),
# just check that the message has not changed
assert message.attached_skus == ['SKU1', 'SKU2']
assert message.available_repos == ['Repo1', 'Repo2']
assert message.enabled_repos == ['Repo2']
assert message.release == '7.9'
assert message.existing_product_certificates == ['Cert1', 'Cert2', 'Cert3']
assert message.sca_detected
275 changes: 272 additions & 3 deletions repos/system_upgrade/common/libraries/tests/test_rhsm.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,112 @@
from collections import namedtuple
import os

import pytest

from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries.common import repofileutils, rhsm
from leapp.libraries.common.testutils import create_report_mocked, logger_mocked
from leapp.libraries.common.testutils import (
create_report_mocked,
CurrentActorMocked,
logger_mocked
)
from leapp.libraries.stdlib import CalledProcessError, api
from leapp.models import RepositoryFile, RepositoryData

Repository = namedtuple('Repository', ['repoid', 'file'])
LIST_SEPARATOR = '\n - '

# External commands called by the RHSM library
CMD_RHSM_LIST_CONSUMED = ('subscription-manager', 'list', '--consumed')
CMD_RHSM_STATUS = ('subscription-manager', 'status')
CMD_RHSM_RELEASE = ('subscription-manager', 'release')
CMD_RHSM_LIST_ENABLED_REPOS = ('subscription-manager', 'repos', '--list-enabled')

RHSM_STATUS_OUTPUT_NOSCA = '''
+-------------------------------------------+
System Status Details
+-------------------------------------------+
Overall Status: Current
System Purpose Status: Not Specified
'''

RHSM_STATUS_OUTPUT_SCA = '''
+-------------------------------------------+
System Status Details
+-------------------------------------------+
Overall Status: Current
System Purpose Status: Matched
Content Access Mode is set to Simple Content Access
'''

# Used to simulate realistic output of RHSM, therefore carries more information than `Repository` namedtuple
RHSMRepositoryEntry = namedtuple('RHSMRepositoryEntry', ('id', 'name', 'url', 'enabled')) # For clarity purposes
RHSM_ENABLED_REPOS = [
RHSMRepositoryEntry(
id='rhel-8-for-x86_64-appstream-rpms',
name='Appstream',
url='some_url',
enabled='1'),
RHSMRepositoryEntry(
id='satellite-tools-6.6-for-rhel-8-x86_64-rpms',
name='Satellite',
url='some_url',
enabled='1'),
RHSMRepositoryEntry(
id='rhel-8-for-x86_64-baseos-rpms',
name='Base',
url='some_url',
enabled='1')
]


class IsolatedActionsMocked(object):
def __init__(self, call_stdout=None, raise_err=False):
self.commands_called = []
self.call_return = {'stdout': call_stdout, 'stderr': None}
self.raise_err = raise_err

def call(self, cmd, *args):
# A map from called commands to their mocked output
self.mocked_command_call_outputs = dict()

def call(self, cmd, *args, **dummy_kwargs):
self.commands_called.append(cmd)
if self.raise_err:
raise_call_error(cmd)
return self.call_return

return self.mocked_command_call_outputs.get(
tuple(cmd), # Cast to tuple, as list is not hashable
self.call_return)

def add_mocked_command_call_with_stdout(self, cmd, stdout):
# We cast `cmd` from list to tuple, as a list cannot be hashed
self.mocked_command_call_outputs[tuple(cmd)] = {
'stdout': stdout,
'stderr': None}

def full_path(self, path):
return path


@pytest.fixture
def actor_mocked(monkeypatch):
"""
Fixture providing a mocked actor that was already used to monkeypatch api.current_actor.
Introduced to reduce repetition inside tests.
"""
actor = CurrentActorMocked()
monkeypatch.setattr(api, 'current_actor', actor)
return actor


@pytest.fixture
def context_mocked():
return IsolatedActionsMocked()


def raise_call_error(args=None, exit_code=1):
Expand Down Expand Up @@ -119,3 +202,189 @@ def test_inhibit_on_duplicate_repos_no_dups(monkeypatch):

assert not api.current_logger.warnmsg
assert reporting.create_report.called == 0


def test_sku_listing(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the rhsm library can obtain used SKUs correctly."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_LIST_CONSUMED, 'SKU: 598339696910')

attached_skus = rhsm.get_attached_skus(context_mocked)

assert_fail_description = 'Some calls to subscription-manager were expected.'
assert context_mocked.commands_called, assert_fail_description

assert_fail_description = 'RHSM command reported 1 SKU, however {0} were detected.'.format(
len(attached_skus)
)
assert len(attached_skus) == 1, assert_fail_description

assert_fail_description = 'The parsed SKU is different than the one contained in the mocked RHSM output.'
assert attached_skus[0] == '598339696910', assert_fail_description


def test_scanrhsminfo_with_skip_rhsm(monkeypatch, context_mocked):
"""Tests whether the scan_rhsm_info respects the LEAPP_NO_RHSM environmental variable."""
mocked_actor = CurrentActorMocked(envars={'LEAPP_NO_RHSM': '1'})
monkeypatch.setattr(api, 'current_actor', mocked_actor)

result = rhsm.scan_rhsm_info(context_mocked)

assert_fail_description = 'No external shell commands should be executed when RHSM is skipped.'
assert not context_mocked.commands_called, assert_fail_description

assert result is None, 'The `scan_rhsm_info` should not provide any output when RHSM is skipped.'


def test_get_release(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library correctly retrieves release from RHSM."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_RELEASE, 'Release: 7.9')

release = rhsm.get_release(context_mocked)

assert release, 'No release information detected (but valid release info was provided).'
assert release == '7.9', 'Detected release is incorrect.'


def test_get_release_with_release_not_set(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library does not retrieve release information when the release is not set."""
# Test whether no realease is detected correctly too
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_RELEASE, 'Release not set')

release = rhsm.get_release(context_mocked)

fail_description = 'The release information was obtained, even if "No release set" was repored by rhsm.'
assert not release, fail_description


def test_is_manifest_sca_on_nonsca_system(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library obtains the SCA information correctly from a non-SCA system."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_STATUS, RHSM_STATUS_OUTPUT_NOSCA)

is_sca = rhsm.is_manifest_sca(context_mocked)
assert not is_sca, 'SCA was detected on a non-SCA system.'


def test_is_manifest_sca_on_sca_system(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library obtains the SCA information from SCA system correctly."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_STATUS, RHSM_STATUS_OUTPUT_SCA)

is_sca = rhsm.is_manifest_sca(context_mocked)
assert is_sca, 'Failed to detected SCA on a SCA system.'


def test_get_enabled_repo_ids(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library retrieves correct information about enabled repositories."""
# Prepare the (realistic) RHSM output
rhsm_list_enabled_output = '''
+----------------------------------------------------------+
Available Repositories in /etc/yum.repos.d/redhat.repo
+----------------------------------------------------------+
'''

for enabled_repository in RHSM_ENABLED_REPOS:
rhsm_output_fragment = 'Repo ID: {0}\n'.format(enabled_repository.id)
rhsm_output_fragment += 'Repo Name: {0}\n'.format(enabled_repository.name)
rhsm_output_fragment += 'Repo URL: {0}\n'.format(enabled_repository.url)
rhsm_output_fragment += 'Enabled: {0}\n'.format(enabled_repository.enabled)
rhsm_output_fragment += '\n'
rhsm_list_enabled_output += rhsm_output_fragment

context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_LIST_ENABLED_REPOS, rhsm_list_enabled_output)

enabled_repo_ids = rhsm.get_enabled_repo_ids(context_mocked)

fail_description = 'Failed to detected enabled repositories on the system.'
assert len(enabled_repo_ids) == 3, fail_description

fail_description = 'Failed to retrieve repository ID provided in the RHSM output.'
for enabled_repository in RHSM_ENABLED_REPOS:
assert enabled_repository.id in enabled_repo_ids, fail_description


def test_get_existing_product_certificates(monkeypatch, actor_mocked, context_mocked):
"""Verifies that the library is able to correctly retrieve existing product certificates."""

CERT_DIRS_LAYOUT = {
'/etc/pki/product': ['cert1', 'cert2'],
'/etc/pki/product-default': ['cert3']
}

def mocked_isdir(path):
if path in CERT_DIRS_LAYOUT:
return True
err_message = 'RHSM library should not gather info about additional dirs (attempted to isdir: {0}).'
raise ValueError(err_message.format(path))

def mocked_listdir(path):
if path in CERT_DIRS_LAYOUT:
return CERT_DIRS_LAYOUT[path]
err_message = 'RHSM library should not listdir additional dirs (attempted to listdir: {0}).'
raise ValueError(err_message.format(path))

def mocked_isfile(path):
if path in CERT_DIRS_LAYOUT:
# The certificate directories are not files
return False

basename = os.path.basename(path)
dirname = os.path.dirname(path)
if dirname in CERT_DIRS_LAYOUT:
return basename in CERT_DIRS_LAYOUT[dirname]

err_message = 'RHSM library should not isfile additional paths (attempted to isfile: {0}).'
raise ValueError(err_message.format(path))

monkeypatch.setattr(rhsm.os.path, 'isdir', mocked_isdir)
monkeypatch.setattr(rhsm.os, 'listdir', mocked_listdir)
monkeypatch.setattr(rhsm.os.path, 'isfile', mocked_isfile)

existing_product_certificates = rhsm.get_existing_product_certificates(context_mocked)

fail_description = 'Retrieved different number of certificates than expected.'
assert len(existing_product_certificates) == 3, fail_description

fail_description_bad_dir = 'Found certificate in unexpected path: {0}'
fail_description_bad_cert_file = 'Found certificate file that was not provided by mocked output: {0}'
for certificate_path in existing_product_certificates:
dirname = os.path.dirname(certificate_path)
basename = os.path.basename(certificate_path)
assert dirname in CERT_DIRS_LAYOUT, fail_description_bad_dir.format(certificate_path)
assert basename in CERT_DIRS_LAYOUT[dirname], fail_description_bad_cert_file.format(certificate_path)


def test_get_existing_product_certificates_missing_cert_directory(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library is able to retrieve certificates even if /etc/pki/product is missing."""

def mocked_isdir(path):
if path == '/etc/pki/product':
return False # Directory is missing
if path == '/etc/pki/product-default':
return True

err_msg = 'Tried to isdir a path that is not a part of the mocked paths. Path: {0}'
raise ValueError(err_msg.format(path))

def mocked_isfile(path):
if path == '/etc/pki/product-default/cert':
return True

err_msg = 'Tried to use isfile on a path that is not a part of the mocked paths. Path: {0}'
raise ValueError(err_msg.format(path))

def mocked_listdir(path):
if path == '/etc/pki/product-default':
return ['cert']

err_msg = 'Tried to use listdir on a path that is not a part of the mocked paths. Path: {0}'
raise ValueError(err_msg.format(path))

monkeypatch.setattr(rhsm.os.path, 'isdir', mocked_isdir)
monkeypatch.setattr(rhsm.os, 'listdir', mocked_listdir)
monkeypatch.setattr(rhsm.os.path, 'isfile', mocked_isfile)

existing_product_certificates = rhsm.get_existing_product_certificates(context_mocked)

fail_description = 'Library identified more certificates than there are in mocked outputs.'
assert len(existing_product_certificates) == 1, fail_description
fail_description = 'Library failed to identify certificate from mocked outputs.'
assert existing_product_certificates[0] == '/etc/pki/product-default/cert', fail_description

0 comments on commit 7f1bf06

Please sign in to comment.