Unverified Commit 6925b181 authored by Pierre-Louis Bonicoli's avatar Pierre-Louis Bonicoli 🏗
Browse files

Add molecule based tests

parent ef43679e
pytestdebug.log
# Molecule managed
{% if item.registry is defined %}
FROM {{ item.registry.url }}/{{ item.image }}
{% else %}
FROM {{ item.image }}
{% endif %}
{% if item.env is defined %}
{% for var, value in item.env.items() %}
{% if value %}
ENV {{ var }} {{ value }}
{% endif %}
{% endfor %}
{% endif %}
RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get install -y python3 sudo bash ca-certificates iproute2 python3-apt systemd-sysv && apt-get clean; \
elif [ $(command -v dnf) ]; then dnf makecache && dnf --assumeyes install /usr/bin/python3 /usr/bin/python3-config /usr/bin/dnf-3 sudo bash iproute && dnf clean all; \
elif [ $(command -v yum) ]; then yum makecache fast && yum install -y /usr/bin/python /usr/bin/python2-config sudo yum-plugin-ovl bash iproute && sed -i 's/plugins=0/plugins=1/g' /etc/yum.conf && yum clean all; \
elif [ $(command -v zypper) ]; then zypper refresh && zypper install -y python3 sudo bash python3-xml iproute2 && zypper clean -a; \
elif [ $(command -v apk) ]; then apk update && apk add --no-cache python3 sudo bash ca-certificates; \
elif [ $(command -v xbps-install) ]; then xbps-install -Syu && xbps-install -y python3 sudo bash ca-certificates iproute2 && xbps-remove -O; fi
......@@ -22,4 +22,4 @@ practices.
Usage
=====
$ molecule lint
$ molecule test
---
- hosts: &burp_server molecule-burp-server
vars:
burp_client_can_force_backup: 1
burp: &burp
name: test-backup
port: 4971
ssl:
CA: '{{ ca["content"] | b64decode }}'
cert: '{{ server_cert["content"] | b64decode }}'
key: '{{ server_key["content"] | b64decode }}'
DH: '{{ dh["content"] | b64decode }}'
pre_tasks:
- name: Fetch CA
slurp:
src: /tmp/ca.pem
register: ca
- name: Fetch server certificate
slurp:
src: /tmp/server.pem
register: server_cert
- name: Fetch server key
slurp:
src: /tmp/server_privatekey.pem
register: server_key
- name: Fetch DH param
slurp:
src: /tmp/dhparams.pem
register: dh
- name: Fetch client certificate
slurp:
src: /tmp/client.pem
register: client_cert
- name: Fetch client key
slurp:
src: /tmp/client_privatekey.pem
register: client_key
roles:
- role: burp
type: server
become: yes
- hosts: molecule-burp-client
vars:
burp:
name: test-backup
port: 4971
ssl:
CA: '{{ hostvars["molecule-burp-server"]["ca"]["content"] | b64decode }}'
cert: '{{ hostvars["molecule-burp-server"]["server_cert"]["content"] | b64decode }}'
key: '{{ hostvars["molecule-burp-server"]["server_key"]["content"] | b64decode }}'
DH: '{{ hostvars["molecule-burp-server"]["dh"]["content"] | b64decode }}'
burp_clients:
_server: *burp_server
testuser:
password: molecule-burp-client-password
ssl:
cert: '{{ hostvars["molecule-burp-server"]["client_cert"]["content"] | b64decode }}'
key: '{{ hostvars["molecule-burp-server"]["client_key"]["content"] | b64decode }}'
backups:
documents:
paths:
data:
- '{{ canary_path | dirname }}'
cron:
timer:
args:
- 24h # minimum interval since the last successful backup
# line below always match
- 'Mon,Tue,Wed,Thu,Fri,Sat,Sun,{{ lookup("sequence", "start=0 end=24 format=%02d") }}'
keep: [7, 1, 2] # 7 days, 1 weeks, 2 months
options:
compression: gzip0
roles:
- role: burp
type: client
become: yes
......@@ -14,12 +14,14 @@ platforms:
security_opts: ['seccomp=unconfined']
tmpfs: ['/tmp', '/run', '/run/lock']
volumes: ['/sys/fs/cgroup:/sys/fs/cgroup:ro']
networks: [{name: burp_network}] # don't use default network in order to allow DNS resolution between containers
- name: molecule-burp-client
image: debian:stretch
command: /sbin/init
security_opts: ['seccomp=unconfined']
tmpfs: ['/tmp', '/run', '/run/lock']
volumes: ['/sys/fs/cgroup:/sys/fs/cgroup:ro']
networks: [{name: burp_network}]
provisioner:
name: ansible
config_options:
......@@ -29,5 +31,10 @@ provisioner:
group_vars:
all:
ansible_python_interpreter: /usr/bin/python3
host_vars:
molecule-burp-client:
canary: foobar
canary_path: '/home/{{ test_user }}/data/canary'
test_user: testuser
verifier:
name: testinfra
---
- hosts: molecule-burp-server
tasks:
- name: install pyopenssl
package:
name: python3-openssl
- name: Generate DH Parameters (2048 bits)
openssl_dhparam:
path: /tmp/dhparams.pem
size: 2048
- name: Generate privatekeys
openssl_privatekey:
path: '/tmp/{{ item }}_privatekey.pem'
type: RSA
size: 1024
loop:
- ca
- server
- client
- name: Generate CSR for CA
openssl_csr:
path: /tmp/ca_csr.csr
privatekey_path: /tmp/ca_privatekey.pem
basic_constraints:
- 'CA:TRUE'
basic_constraints_critical: true
key_usage:
- Certificate Sign
- CRL Sign
organizational_unit_name: Test BURP CA
- name: Generate selfsigned CA
openssl_certificate:
path: /tmp/ca.pem
csr_path: /tmp/ca_csr.csr
privatekey_path: /tmp/ca_privatekey.pem
provider: selfsigned
selfsigned_not_after: +2d
selfsigned_digest: sha256
- name: Generate CSR for server certificate
openssl_csr:
path: /tmp/server_csr.csr
privatekey_path: /tmp/server_privatekey.pem
basic_constraints:
- 'CA:FALSE'
basic_constraints_critical: true
key_usage:
- Digital Signature
- Non Repudiation
- Key Encipherment
subject:
organizationalUnitName: Test BURP CA
commonName: molecule-burp-server
- name: Generate server certificate
openssl_certificate:
path: /tmp/server.pem
csr_path: /tmp/server_csr.csr
privatekey_path: /tmp/server_privatekey.pem
ownca_path: /tmp/ca.pem
ownca_privatekey_path: /tmp/ca_privatekey.pem
ownca_not_after: +2d
provider: ownca
- name: Generate CSR for client certificate
openssl_csr:
path: /tmp/client_csr.csr
privatekey_path: /tmp/client_privatekey.pem
basic_constraints:
- 'CA:FALSE'
basic_constraints_critical: true
key_usage:
- Digital Signature
- Key Encipherment
- Data Encipherment
- Key Agreement
subject:
organizationalUnitName: Test BURP CA
commonName: molecule-burp-client
- name: Generate client certificate
openssl_certificate:
path: /tmp/client.pem
csr_path: /tmp/client_csr.csr
privatekey_path: /tmp/client_privatekey.pem
ownca_path: /tmp/ca.pem
ownca_privatekey_path: /tmp/ca_privatekey.pem
ownca_not_after: +2d
provider: ownca
- hosts: molecule-burp-client
tasks:
- name: create test user
user:
name: '{{ test_user }}'
- name: create test directory
file:
path: '{{ canary_path | dirname }}'
state: directory
owner: '{{ test_user }}'
group: '{{ test_user }}'
- module_defaults:
copy:
owner: '{{ test_user }}'
group: '{{ test_user }}'
force: no
block:
- name: create test file
copy:
content: '{{ canary }}'
dest: '{{ canary_path }}'
"""PyTest Fixtures."""
from __future__ import absolute_import
import os
import pytest
def pytest_runtest_setup(item):
"""Run tests only when under molecule with testinfra installed."""
try:
import testinfra
except ImportError:
pytest.skip("Test requires testinfra", allow_module_level=True)
if "MOLECULE_INVENTORY_FILE" in os.environ:
pytest.testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
os.environ["MOLECULE_INVENTORY_FILE"]
).get_hosts("all")
else:
pytest.skip(
"Test should run only from inside molecule.", allow_module_level=True
)
"""Role testing files using testinfra."""
import os
import os.path
import re
from time import sleep
import pytest
import testinfra
testinfra_hosts = ['molecule-burp-client']
client_conf = '/etc/burp/test-backup/molecule-burp-server-molecule-burp-client-documents.conf'
burp_command = '/usr/sbin/burp -a {action} -c {conf}'
canary_updated = 'coincoin'
@pytest.fixture(scope="function")
def restore_dir(host):
# temp directory will belongs to root
restore_dir = host.ansible('tempfile', 'state=directory', check=False)
yield restore_dir['path']
host.ansible('file', f"path={restore_dir['path']} state=absent", check=False)
def test_check_backup(host):
'''Check exactly one backup exists'''
# wait for the first backup which is triggered by a cron job executed every minute
inventory = os.environ['MOLECULE_INVENTORY_FILE']
uri = 'ansible://%s?ansible_inventory=%s'
burp_server = testinfra.get_host(uri % ('molecule-burp-server', inventory))
wait_cmd = 'zgrep -q "Backup completed" ' \
'/var/local/backups-burp/test-backup/molecule-burp-client-documents/current/log.gz'
for i in range(0, 14): # 70 seconds max
sleep(5)
check = burp_server.ansible('command', wait_cmd, check=False)
if check['rc'] == 0:
break
else:
assert False, "Backup wasn't completed on server"
list_backups_cmd = burp_command.format(action='l', conf=client_conf)
backups_list = host.ansible('command', list_backups_cmd, check=False)
assert backups_list['rc'] == 0
backups = re.findall("^Backup: [0-9]{7}", backups_list['stdout'], re.MULTILINE)
assert len(backups) == 1
assert backups[0] == 'Backup: 0000001'
def test_list_content(host):
'''Check that canary filename is listed in backup'''
host_vars = host.ansible.get_variables()
canary_path = host_vars['canary_path']
canary_filename = os.path.basename(canary_path)
list_content_cmd = burp_command.format(action='l', conf=client_conf)
list_content_cmd += ' -r %s' % canary_filename
list_content = host.ansible('command', list_content_cmd, check=False)
assert list_content['rc'] == 0
canary_path_rendered = host.ansible('debug', 'msg=%s' % canary_path, check=False)
assert canary_path_rendered['msg'] in list_content['stdout_lines']
def restore_and_check(host, restore_dir, canary, canary_path):
'''Restore and check content'''
restore_cmd = burp_command.format(action='r', conf=client_conf)
restore_cmd += ' -d %s -r %s -s %d' % (restore_dir, canary_path, canary_path.count(os.path.sep))
restore = host.ansible('command', restore_cmd, check=False)
assert restore['rc'] == 0
assert restore['stdout'].endswith('restore finished')
restored_filename = os.path.basename(canary_path)
restored_path = os.path.join(restore_dir, restored_filename)
assert canary == host.file(restored_path).content_string
def test_restore(host, restore_dir):
'''Restore and check content'''
host_vars = host.ansible.get_variables()
canary = host_vars['canary']
canary_path = host_vars['canary_path']
restore_and_check(host, restore_dir, canary, canary_path)
def test_backup(host):
'''force a new backup'''
host_vars = host.ansible.get_variables()
canary_path = host_vars['canary_path']
test_user = host_vars['test_user']
host.ansible("copy", f"content={canary_updated} dest={canary_path} owner={test_user} group={test_user}",
check=False)
backup_cmd = burp_command.format(action='b', conf=client_conf)
backup = host.ansible('command', backup_cmd, check=False)
assert backup['rc'] == 0
assert backup['stdout'].endswith('backup finished ok')
def test_list_new_backup(host):
'''Check exactly two backups exist'''
list_backups_cmd = burp_command.format(action='l', conf=client_conf)
backups_list = host.ansible('command', list_backups_cmd, check=False)
assert backups_list['rc'] == 0
backups = re.findall("^Backup: [0-9]{7}", backups_list['stdout'], re.MULTILINE)
assert len(backups) == 2
assert backups[0] == 'Backup: 0000001'
assert backups[1] == 'Backup: 0000002'
def test_new_backup_content(host, restore_dir):
'''Restore and check content'''
host_vars = host.ansible.get_variables()
canary_path = host_vars['canary_path']
restore_and_check(host, restore_dir, canary_updated, canary_path)
molecule[docker]
testinfra
yamllint
ansible-lint
flake8
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment