Source code for testing.unit.test_patchdir

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA


import io
import platform
import unittest

from duplicity import diffdir
from duplicity import patchdir
from duplicity import selection
from duplicity.path import *  # pylint: disable=unused-wildcard-import,redefined-builtin
from testing import _runtest_dir
from . import UnitTestCase


[docs]class PatchingTest(UnitTestCase): """Test patching"""
[docs] def setUp(self): super().setUp() self.unpack_testfiles()
[docs] def copyfileobj(self, infp, outfp): """Copy in fileobj to out, closing afterwards""" blocksize = 32 * 1024 while True: buf = infp.read(blocksize) if not buf: break outfp.write(buf) assert not infp.close() assert not outfp.close()
[docs] def test_total(self): """Test cycle on dirx""" self.total_sequence( [ f"{_runtest_dir}/testfiles/dir1", f"{_runtest_dir}/testfiles/dir2", f"{_runtest_dir}/testfiles/dir3", ] )
[docs] def get_sel(self, path): """Get selection iter over the given directory""" return selection.Select(path).set_iter()
[docs] def total_sequence(self, filelist): """Test signatures, diffing, and patching on directory list""" assert len(filelist) >= 2 sig = Path(f"{_runtest_dir}/testfiles/output/sig.tar") diff = Path(f"{_runtest_dir}/testfiles/output/diff.tar") seq_path = Path(f"{_runtest_dir}/testfiles/output/sequence") new_path, old_path = None, None # set below in for loop # Write initial full backup to diff.tar for dirname in filelist: old_path, new_path = new_path, Path(dirname) if old_path: sigblock = diffdir.DirSig(self.get_sel(seq_path)) diffdir.write_block_iter(sigblock, sig) deltablock = diffdir.DirDelta(self.get_sel(new_path), sig.open("rb")) else: deltablock = diffdir.DirFull(self.get_sel(new_path)) diffdir.write_block_iter(deltablock, diff) patchdir.Patch(seq_path, diff.open("rb")) # print "#########", seq_path, new_path assert seq_path.compare_recursive(new_path, 1)
[docs] def test_block_tar(self): """Test building block tar from a number of files""" def get_fileobjs(): """Return iterator yielding open fileobjs of tar files""" for i in range(1, 4): p = Path(f"{_runtest_dir}/testfiles/blocktartest/test{i}.tar") fp = p.open("rb") yield fp fp.close() tf = patchdir.TarFile_FromFileobjs(get_fileobjs()) namelist = [] for tarinfo in tf: namelist.append(tarinfo.name) for i in range(1, 6): assert f"tmp/{int(i)}" in namelist, namelist
[docs] def test_doubledot_hole(self): """Test for the .. bug that lets tar overwrite parent dir""" def make_bad_tar(filename): """Write attack dup_tarfile to filename""" tf = dup_tarfile.TarFile(name=filename, mode="w") # file object will be empty, and tarinfo will have path # "snapshot/../warning-security-error" assert not os.system(f"cat /dev/null > {_runtest_dir}/testfiles/output/file") path = Path(f"{_runtest_dir}/testfiles/output/file") path.index = (b"diff", b"..", b"warning-security-error") ti = path.get_tarinfo() fp = io.StringIO("") tf.addfile(ti, fp) tf.close() make_bad_tar(f"{_runtest_dir}/testfiles/output/bad.tar") os.mkdir(f"{_runtest_dir}/testfiles/output/temp") self.assertRaises( patchdir.PatchDirException, patchdir.Patch, Path(f"{_runtest_dir}/testfiles/output/temp"), open(f"{_runtest_dir}/testfiles/output/bad.tar", "rb"), ) assert not Path(f"{_runtest_dir}/testfiles/output/warning-security-error").exists()
[docs]class index(object): """Used below to test the iter collation"""
[docs] def __init__(self, index): self.index = index
[docs]class CollateItersTest(UnitTestCase):
[docs] def setUp(self): super().setUp() self.unpack_testfiles()
[docs] def test_collate(self): """Test collate_iters function""" indicies = [index(i) for i in [0, 1, 2, 3]] helper = lambda i: indicies[i] makeiter1 = lambda: iter(indicies) makeiter2 = lambda: map(helper, [0, 1, 3]) makeiter3 = lambda: map(helper, [1, 2]) outiter = patchdir.collate_iters([makeiter1(), makeiter2()]) assert Iter.equal( outiter, iter( [ (indicies[0], indicies[0]), (indicies[1], indicies[1]), (indicies[2], None), (indicies[3], indicies[3]), ] ), ) assert Iter.equal( patchdir.collate_iters([makeiter1(), makeiter2(), makeiter3()]), iter( [ (indicies[0], indicies[0], None), (indicies[1], indicies[1], indicies[1]), (indicies[2], None, indicies[2]), (indicies[3], indicies[3], None), ] ), 1, ) assert Iter.equal( patchdir.collate_iters([makeiter1(), iter([])]), iter([(i, None) for i in indicies]), ) assert Iter.equal( [(i, None) for i in indicies], patchdir.collate_iters([makeiter1(), iter([])]), )
[docs] def test_tuple(self): """Test indexed tuple""" i = patchdir.IndexedTuple((1, 2, 3), ("a", "b")) i2 = patchdir.IndexedTuple((), ("hello", "there", "how are you")) assert i[0] == "a" assert i[1] == "b" assert i2[1] == "there" assert len(i) == 2 and len(i2) == 3 assert i2 < i, i2 < i
[docs] def test_tuple_assignment(self): a, b, c = patchdir.IndexedTuple((), (1, 2, 3)) assert a == 1 assert b == 2 assert c == 3
[docs]class TestInnerFuncs(UnitTestCase): """Test some other functions involved in patching"""
[docs] def setUp(self): super().setUp() self.unpack_testfiles() self.check_output()
[docs] def check_output(self): """Make {0}/testfiles/output exists""" out = Path(f"{_runtest_dir}/testfiles/output") if not (out.exists() and out.isdir()): out.mkdir() self.out = out
[docs] def snapshot(self): """Make a snapshot ROPath, permissions 0o600""" ss = self.out.append("snapshot") fout = ss.open("wb") fout.write(b"hello, world!") assert not fout.close() ss.chmod(0o600) ss.difftype = "snapshot" return ss
[docs] def get_delta(self, old_buf, new_buf): """Return delta buffer from old to new""" sigfile = librsync.SigFile(io.BytesIO(old_buf)) sig = sigfile.read() assert not sigfile.close() deltafile = librsync.DeltaFile(sig, io.BytesIO(new_buf)) deltabuf = deltafile.read() assert not deltafile.close() return deltabuf
[docs] def delta1(self): """Make a delta ROPath, permissions 0o640""" delta1 = self.out.append("delta1") fout = delta1.open("wb") fout.write(self.get_delta(b"hello, world!", b"aonseuth aosetnuhaonsuhtansoetuhaoe")) assert not fout.close() delta1.chmod(0o640) delta1.difftype = "diff" return delta1
[docs] def delta2(self): """Make another delta ROPath, permissions 0o644""" delta2 = self.out.append("delta1") fout = delta2.open("wb") fout.write( self.get_delta( b"aonseuth aosetnuhaonsuhtansoetuhaoe", b"3499 34957839485792357 458348573", ) ) assert not fout.close() delta2.chmod(0o644) delta2.difftype = "diff" return delta2
[docs] def deleted(self): """Make a deleted ROPath""" deleted = self.out.append("deleted") assert not deleted.exists() deleted.difftype = "deleted" return deleted
[docs] def test_normalize(self): """Test normalizing a sequence of diffs""" ss = self.snapshot() d1 = self.delta1() d2 = self.delta2() de = self.deleted() seq1 = [ss, d1, d2] seq2 = [ss, d1, d2, de] seq3 = [de, ss, d1, d2] seq4 = [de, ss, d1, d2, ss] seq5 = [de, ss, d1, d2, ss, d1, d2] def try_seq(input_seq, correct_output_seq): normed = patchdir.normalize_ps(input_seq) assert normed == correct_output_seq, (normed, correct_output_seq) try_seq(seq1, seq1) try_seq(seq2, [de]) try_seq(seq3, seq1) try_seq(seq4, [ss]) try_seq(seq5, seq1)
# TODO: fix test_patch_seq2ropath for macOS, maybe others. # Fails under pytest, and pydevd # ---------- # def testseq(seq, perms, buf): # result = patchdir.patch_seq2ropath(seq) # > assert result.getperms() == perms, (result.getperms(), perms) # E AssertionError: ('501:0 600', '501:20 600') # E assert '501:0 600' == '501:20 600' # E - 501:0 600 # E + 501:20 600
[docs] @unittest.skipUnless(platform.platform().startswith("Linux"), "Skip on non-Linux systems") def test_patch_seq2ropath(self): """Test patching sequence""" def testseq(seq, perms, buf): result = patchdir.patch_seq2ropath(seq) assert result.getperms() == perms, (result.getperms(), perms) fout = result.open("rb") contents = fout.read() assert not fout.close() assert contents == buf, (contents, buf) ids = f"{int(os.getuid())}:{int(os.getgid())}" testseq([self.snapshot()], f"{ids} 600", b"hello, world!") testseq( [self.snapshot(), self.delta1()], f"{ids} 640", b"aonseuth aosetnuhaonsuhtansoetuhaoe", ) testseq( [self.snapshot(), self.delta1(), self.delta2()], f"{ids} 644", b"3499 34957839485792357 458348573", )
if __name__ == "__main__": unittest.main()