Source code for rhui4_tests.test_security
"""Various Security Tests"""
import csv
import logging
from os.path import basename
import subprocess
import nose
import requests
import urllib3
from rhui4_tests_lib.cfg import RHUI_ROOT
from rhui4_tests_lib.conmgr import ConMgr
from rhui4_tests_lib.rhuimanager import RHUIManager
from rhui4_tests_lib.rhuimanager_instance import RHUIManagerInstance
logging.basicConfig(level=logging.DEBUG)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
HOSTNAMES = {"RHUA": ConMgr.get_rhua_hostname(),
"CDS": ConMgr.get_cds_hostnames()[0],
"HAProxy": ConMgr.get_lb_hostname()}
PORTS = { "https": 443 }
PROTOCOL_TEST_CMD = "echo | openssl s_client -%s -connect %s:%s"
# these are in fact the s_client options for protocols, just without the dash
PROTOCOLS = {"good": ["tls1_2", "tls1_3"],
"bad": ["tls1", "tls1_1"]}
# connections to the RHUA and the HAProxy nodes
RHUA = ConMgr.connect()
CDS = ConMgr.connect(HOSTNAMES["CDS"])
HAPROXY = ConMgr.connect(HOSTNAMES["HAProxy"])
SSL_CERT = f"{RHUI_ROOT}/cds-config/ssl/{HOSTNAMES['HAProxy']}.crt"
def _check_protocols(hostname, port):
"""helper method to try various protocols on hostname:port"""
# check allowed protocols
for protocol in PROTOCOLS["good"]:
exitcode = subprocess.call(PROTOCOL_TEST_CMD % (protocol, hostname, port) + " &> /dev/null",
shell=True)
nose.tools.eq_(exitcode, 0)
# check disallowed protocols
for protocol in PROTOCOLS["bad"]:
exitcode = subprocess.call(PROTOCOL_TEST_CMD % (protocol, hostname, port) + " &> /dev/null",
shell=True)
nose.tools.eq_(exitcode, 1)
def _get_cn(connection):
"""get the Common Name from the CDS SSL certificate"""
cmd = f"openssl x509 -noout -subject -in {SSL_CERT}"
_, stdout, _ = connection.exec_command(cmd)
subject_data = [item.strip() for item in stdout.read().decode().split(",")]
cn_raw = [item for item in subject_data if item.startswith("CN")][0]
cn = cn_raw.replace("CN = ", "")
return cn
def _get_san(connection):
"""get the Subject Alternative Name from the CDS SSL certificate"""
cmd = f"openssl x509 -noout -ext subjectAltName -in {SSL_CERT}"
_, stdout, _ = connection.exec_command(cmd)
san_data = [item.strip() for item in stdout.read().decode().splitlines()[1:]]
san_raw = [item for item in san_data if item.startswith("DNS")][0]
san = san_raw.replace("DNS:", "")
return san
[docs]
def setup():
"""announce the beginning of the test run"""
print(f"*** Running {basename(__file__)}: ***")
[docs]
def test_01_login_add_cds_hap():
"""log in to RHUI, add CDS and HAProxy nodes"""
RHUIManager.initial_run(RHUA)
RHUIManagerInstance.add_instance(RHUA, "cds")
RHUIManagerInstance.add_instance(RHUA, "loadbalancers")
[docs]
def test_02_https_rhua():
"""check protocols allowed by nginx on the RHUA"""
# for RHBZ#1637261
_check_protocols(HOSTNAMES["RHUA"], PORTS["https"])
[docs]
def test_03_https_cds():
"""check protocols allowed by nginx on the CDS nodes"""
# for RHBZ#1637261
_check_protocols(HOSTNAMES["CDS"], PORTS["https"])
[docs]
def test_04_haproxy_stats():
"""check haproxy stats"""
# for RHBZ#1718066
_, stdout, _ = HAPROXY.exec_command("echo 'show stat' | nc -U /var/lib/haproxy/stats")
stats = list(csv.DictReader(stdout))
httpsstats = {row["svname"]: row["status"] for row in stats if row["# pxname"] == "https00"}
# check the stats for the frontend, the CDS nodes, and the backend; https
nose.tools.eq_(httpsstats["FRONTEND"], "OPEN")
nose.tools.eq_(httpsstats["BACKEND"], "UP")
nose.tools.eq_(httpsstats[HOSTNAMES["CDS"]], "UP")
[docs]
def test_05_hsts():
"""check if HTTP Strict Transport Security is used"""
response = requests.head(f"https://{HOSTNAMES['HAProxy']}/", timeout=10, verify=False)
nose.tools.ok_("Strict-Transport-Security" in response.headers,
msg=f"Got these headers: {response.headers}")
[docs]
def test_06_cds_ssl():
"""check the CN and SAN in the CDS SSL certificate"""
cn = _get_cn(CDS)
nose.tools.eq_(cn, HOSTNAMES["HAProxy"])
san = _get_san(CDS)
nose.tools.eq_(san, HOSTNAMES["HAProxy"])
[docs]
def test_99_cleanup():
"""delete CDS and HAProxy nodes"""
RHUIManagerInstance.delete_all(RHUA, "loadbalancers")
RHUIManagerInstance.delete_all(RHUA, "cds")
[docs]
def teardown():
"""announce the end of the test run"""
print(f"*** Finished running {basename(__file__)}. ***")