Source code for testing.unit.test_gpg

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

from __future__ import print_function
from __future__ import division
from future import standard_library
standard_library.install_aliases()
from builtins import range
from builtins import object
from past.utils import old_div

import os
import platform
import pytest
import random
import unittest

from duplicity import gpg
from duplicity import path
from testing import _runtest_dir
from . import UnitTestCase


[docs]@pytest.mark.usefixtures(u"redirect_stdin") class GPGTest(UnitTestCase): u"""Test GPGFile"""
[docs] def setUp(self): super(GPGTest, self).setUp() self.unpack_testfiles() self.default_profile = gpg.GPGProfile(passphrase=u"foobar")
[docs] def gpg_cycle(self, s, profile=None): u"""Test encryption/decryption cycle on string s""" epath = path.Path(u"{0}/testfiles/output/encrypted_file".format(_runtest_dir)) if not profile: profile = self.default_profile encrypted_file = gpg.GPGFile(1, epath, profile) encrypted_file.write(s) encrypted_file.close() epath2 = path.Path(u"{0}/testfiles/output/encrypted_file".format(_runtest_dir)) decrypted_file = gpg.GPGFile(0, epath2, profile) dec_buf = decrypted_file.read() decrypted_file.close() assert s == dec_buf, (len(s), len(dec_buf))
[docs] def test_gpg1(self): u"""Test gpg short strings""" self.gpg_cycle(b"hello, world") self.gpg_cycle(b"ansoetuh aoetnuh aoenstuh aoetnuh asoetuh saoteuh ")
[docs] def test_gpg2(self): u"""Test gpg long strings easily compressed""" self.gpg_cycle(b" " * 50000) self.gpg_cycle(b"aoeu" * 1000000)
[docs] def test_gpg3(self): u"""Test on random data - must have /dev/urandom device""" infp = open(u"/dev/urandom", u"rb") rand_buf = infp.read(120000) infp.close() self.gpg_cycle(rand_buf)
[docs] def test_gpg_asym(self): u"""Test GPG asymmetric encryption""" profile = gpg.GPGProfile(passphrase=self.sign_passphrase, recipients=[self.encrypt_key1, self.encrypt_key2]) self.gpg_cycle(b"aoensutha aonetuh saoe", profile) profile2 = gpg.GPGProfile(passphrase=self.sign_passphrase, recipients=[self.encrypt_key1]) self.gpg_cycle(b"aoeu" * 10000, profile2)
[docs] def test_gpg_hidden_asym(self): u"""Test GPG asymmetric encryption with hidden key id""" profile = gpg.GPGProfile(passphrase=self.sign_passphrase, hidden_recipients=[self.encrypt_key1, self.encrypt_key2]) self.gpg_cycle(b"aoensutha aonetuh saoe", profile) profile2 = gpg.GPGProfile(passphrase=self.sign_passphrase, hidden_recipients=[self.encrypt_key1]) self.gpg_cycle(b"aoeu" * 10000, profile2)
[docs] def test_gpg_signing(self): u"""Test to make sure GPG reports the proper signature key""" plaintext = b"hello" * 50000 signing_profile = gpg.GPGProfile(passphrase=self.sign_passphrase, sign_key=self.sign_key, recipients=[self.encrypt_key1]) epath = path.Path(u"{0}/testfiles/output/encrypted_file".format(_runtest_dir)) encrypted_signed_file = gpg.GPGFile(1, epath, signing_profile) encrypted_signed_file.write(plaintext) encrypted_signed_file.close() decrypted_file = gpg.GPGFile(0, epath, signing_profile) assert decrypted_file.read() == plaintext decrypted_file.close() sig = decrypted_file.get_signature() assert sig == self.sign_key, sig
[docs] def test_gpg_signing_and_hidden_encryption(self): u"""Test to make sure GPG reports the proper signature key even with hidden encryption key id""" plaintext = b"hello" * 50000 signing_profile = gpg.GPGProfile(passphrase=self.sign_passphrase, sign_key=self.sign_key, hidden_recipients=[self.encrypt_key1]) epath = path.Path(u"{0}/testfiles/output/encrypted_file".format(_runtest_dir)) encrypted_signed_file = gpg.GPGFile(1, epath, signing_profile) encrypted_signed_file.write(plaintext) encrypted_signed_file.close() decrypted_file = gpg.GPGFile(0, epath, signing_profile) assert decrypted_file.read() == plaintext decrypted_file.close() sig = decrypted_file.get_signature() assert sig == self.sign_key, sig
[docs] @unittest.skipIf(platform.machine() in [u"ppc64el", u"ppc64le"], u"Skip on ppc64el of ppc64el machines") def test_GPGWriteFile(self): u"""Test GPGWriteFile""" size = 400 * 1000 gwfh = GPGWriteFile_Helper() profile = gpg.GPGProfile(passphrase=u"foobar") for i in range(10): gpg.GPGWriteFile(gwfh, u"{0}/testfiles/output/gpgwrite.gpg".format(_runtest_dir), profile, size=size) # print os.stat("/tmp/testfiles/output/gpgwrite.gpg").st_size-size assert size - 64 * 1024 <= os.stat(u"{0}/testfiles/output/gpgwrite.gpg".format(_runtest_dir)).st_size <= size + 64 * 1024 # noqa gwfh.set_at_end() gpg.GPGWriteFile(gwfh, u"{0}/testfiles/output/gpgwrite.gpg".format(_runtest_dir), profile, size=size)
# print os.stat("/tmp/testfiles/output/gpgwrite.gpg").st_size
[docs] def test_GzipWriteFile(self): u"""Test GzipWriteFile""" size = 400 * 1000 gwfh = GPGWriteFile_Helper() for i in range(10): gpg.GzipWriteFile(gwfh, u"{0}/testfiles/output/gzwrite.gz".format(_runtest_dir), size=size) # print os.stat("/tmp/testfiles/output/gzwrite.gz").st_size-size assert size - 64 * 1024 <= os.stat(u"{0}/testfiles/output/gzwrite.gz".format(_runtest_dir)).st_size <= size + 64 * 1024 # noqa gwfh.set_at_end() gpg.GzipWriteFile(gwfh, u"{0}/testfiles/output/gzwrite.gz".format(_runtest_dir), size=size)
# print os.stat("/tmp/testfiles/output/gzwrite.gz").st_size
[docs]class GPGWriteHelper2(object):
[docs] def __init__(self, data): self.data = data
[docs]class GPGWriteFile_Helper(object): u"""Used in test_GPGWriteFile above"""
[docs] def __init__(self): self.from_random_fp = open(u"/dev/urandom", u"rb") self.at_end = False
[docs] def set_at_end(self): u"""Iterator stops when you call this""" self.at_end = True
[docs] def get_buffer(self, size): u"""Return buffer of size size, consisting of half random data""" s1 = int(old_div(size, 2)) s2 = size - s1 return b"a" * s1 + self.from_random_fp.read(s2)
def __next__(self): if self.at_end: raise StopIteration block_data = self.get_buffer(self.get_read_size()) return GPGWriteHelper2(block_data)
[docs] def get_read_size(self): size = 64 * 1024 if random.randrange(2): return size else: return random.randrange(0, size)
[docs]class SHATest(UnitTestCase): u"""Test making sha signatures"""
[docs] def setUp(self): super(SHATest, self).setUp() self.unpack_testfiles()
[docs] def test_sha(self): testhash = gpg.get_hash(u"SHA1", path.Path(u"{0}/testfiles/various_file_types/regular_file".format(_runtest_dir))) # noqa assert testhash == u"886d722999862724e1e62d0ac51c468ee336ef8e", testhash
if __name__ == u"__main__": unittest.main()