# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# pylint: disable=no-value-for-parameter
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import next
from builtins import range
import unittest
import pickle
import sys
from functools import reduce
from duplicity.lazy import * # pylint: disable=unused-wildcard-import,redefined-builtin
from . import UnitTestCase
[docs]class Iterators(UnitTestCase):
one_to_100 = lambda s: iter(list(range(1, 101)))
evens = lambda s: iter(list(range(2, 101, 2)))
odds = lambda s: iter(list(range(1, 100, 2)))
empty = lambda s: iter([])
[docs] def __init__(self, *args):
super(Iterators, self).__init__(*args)
self.falseerror = self.falseerror_maker()
self.trueerror = self.trueerror_maker()
self.emptygen = self.emptygen_maker()
self.typeerror = self.typeerror_maker()
self.nameerror = self.nameerror_maker()
[docs] def falseerror_maker(self):
yield None
yield 0
yield []
raise Exception
[docs] def trueerror_maker(self):
yield 1
yield u"hello"
yield (2, 3)
raise Exception
[docs] def nameerror_maker(self):
if 0:
yield 1
raise NameError
[docs] def typeerror_maker(self):
yield 1
yield 2
raise TypeError
[docs] def alwayserror(self, x):
raise Exception
[docs] def emptygen_maker(self):
if 0:
yield 1
[docs]class IterEqualTestCase(Iterators):
u"""Tests for iter_equal function"""
[docs] def testEmpty(self):
u"""Empty iterators should be equal"""
assert Iter.equal(self.empty(), iter([]))
[docs] def testNormal(self):
u"""See if normal iterators are equal"""
assert Iter.equal(iter((1, 2, 3)), iter((1, 2, 3)))
assert Iter.equal(self.odds(), iter(list(range(1, 100, 2))))
assert Iter.equal(iter((1, 2, 3)), iter(list(range(1, 4))))
[docs] def testNormalInequality(self):
u"""See if normal unequals work"""
assert not Iter.equal(iter((1, 2, 3)), iter((1, 2, 4)))
assert not Iter.equal(self.odds(), iter([u"hello", u"there"]))
[docs] def testGenerators(self):
u"""equals works for generators"""
def f():
yield 1
yield u"hello"
def g():
yield 1
yield u"hello"
assert Iter.equal(f(), g())
[docs] def testLength(self):
u"""Differently sized iterators"""
assert not Iter.equal(iter((1, 2, 3)), iter((1, 2)))
assert not Iter.equal(iter((1, 2)), iter((1, 2, 3)))
[docs]class FilterTestCase(Iterators):
u"""Tests for lazy_filter function"""
[docs] def testEmpty(self):
u"""empty iterators -> empty iterators"""
assert Iter.empty(Iter.filter(self.alwayserror,
self.empty())), \
u"Filtering an empty iterator should result in empty iterator"
[docs] def testNum1(self):
u"""Test numbers 1 - 100 #1"""
assert Iter.equal(Iter.filter(lambda x: x % 2 == 0,
self.one_to_100()),
self.evens())
assert Iter.equal(Iter.filter(lambda x: x % 2,
self.one_to_100()),
self.odds())
[docs] def testError(self):
u"""Should raise appropriate error"""
i = Iter.filter(lambda x: x, self.falseerror_maker())
if sys.version_info.major >= 3:
self.assertRaises(Exception, i.__next__)
else:
self.assertRaises(Exception, i.next)
[docs]class MapTestCase(Iterators):
u"""Test mapping of iterators"""
[docs] def testNumbers(self):
u"""1 to 100 * 2 = 2 to 200"""
assert Iter.equal(Iter.map(lambda x: 2 * x, self.one_to_100()),
iter(list(range(2, 201, 2))))
[docs] def testShortcut(self):
u"""Map should go in order"""
def f(x):
if x == u"hello":
raise NameError
i = Iter.map(f, self.trueerror_maker())
next(i)
if sys.version_info.major >= 3:
self.assertRaises(NameError, i.__next__)
else:
self.assertRaises(NameError, i.next)
[docs] def testEmpty(self):
u"""Map of an empty iterator is empty"""
assert Iter.empty(Iter.map(lambda x: x, iter([])))
[docs]class CatTestCase(Iterators):
u"""Test concatenation of iterators"""
[docs] def testEmpty(self):
u"""Empty + empty = empty"""
assert Iter.empty(Iter.cat(iter([]), iter([])))
[docs] def testNumbers(self):
u"""1 to 50 + 51 to 100 = 1 to 100"""
assert Iter.equal(Iter.cat(iter(list(range(1, 51))), iter(list(range(51, 101)))),
self.one_to_100())
[docs] def testShortcut(self):
u"""Process iterators in order"""
i = Iter.cat(self.typeerror_maker(), self.nameerror_maker())
next(i)
next(i)
if sys.version_info.major >= 3:
self.assertRaises(TypeError, i.__next__)
else:
self.assertRaises(TypeError, i.next)
[docs]class AndOrTestCase(Iterators):
u"""Test And and Or"""
[docs] def testEmpty(self):
u"""And() -> true, Or() -> false"""
assert Iter.And(self.empty())
assert not Iter.Or(self.empty())
[docs] def testAndShortcut(self):
u"""And should return if any false"""
assert Iter.And(self.falseerror_maker()) is None
[docs] def testOrShortcut(self):
u"""Or should return if any true"""
assert Iter.Or(self.trueerror_maker()) == 1
[docs] def testNormalAnd(self):
u"""And should go through true iterators, picking last"""
assert Iter.And(iter([1, 2, 3, 4])) == 4
self.assertRaises(Exception, Iter.And, self.trueerror_maker())
[docs] def testNormalOr(self):
u"""Or goes through false iterators, picking last"""
assert Iter.Or(iter([0, None, []])) == []
self.assertRaises(Exception, Iter.Or, self.falseerror_maker())
[docs]class FoldingTest(Iterators):
u"""Test folding operations"""
[docs] def f(self, x, y):
return x + y
[docs] def testEmpty(self):
u"""Folds of empty iterators should produce defaults"""
assert Iter.foldl(self.f, 23, self.empty()) == 23
assert Iter.foldr(self.f, 32, self.empty()) == 32
[docs] def testAddition(self):
u"""Use folds to sum lists"""
assert Iter.foldl(self.f, 0, self.one_to_100()) == 5050
assert Iter.foldr(self.f, 0, self.one_to_100()) == 5050
[docs] def testLargeAddition(self):
u"""Folds on 10000 element iterators"""
assert Iter.foldl(self.f, 0, iter(list(range(1, 10001)))) == 50005000
self.assertRaises(RuntimeError,
Iter.foldr, self.f, 0, iter(list(range(1, 10001))))
[docs] def testLen(self):
u"""Use folds to calculate length of lists"""
assert Iter.foldl(lambda x, y: x + 1, 0, self.evens()) == 50
assert Iter.foldr(lambda x, y: y + 1, 0, self.odds()) == 50
[docs]class MultiplexTest(Iterators):
[docs] def testSingle(self):
u"""Test multiplex single stream"""
i_orig = self.one_to_100()
i2_orig = self.one_to_100()
i = Iter.multiplex(i_orig, 1)[0]
assert Iter.equal(i, i2_orig)
[docs] def testTrible(self):
u"""Test splitting iterator into three"""
counter = [0]
def ff(x): # pylint: disable=unused-argument
counter[0] += 1
i_orig = self.one_to_100()
i2_orig = self.one_to_100()
i1, i2, i3 = Iter.multiplex(i_orig, 3, ff)
assert Iter.equal(i1, i2)
assert Iter.equal(i3, i2_orig)
assert counter[0] == 100, counter
[docs] def testDouble(self):
u"""Test splitting into two..."""
i1, i2 = Iter.multiplex(self.one_to_100(), 2)
assert Iter.equal(i1, self.one_to_100())
assert Iter.equal(i2, self.one_to_100())
[docs]class ITRBadder(ITRBranch):
[docs] def start_process(self, index): # pylint: disable=unused-argument
self.total = 0
[docs] def end_process(self):
if self.base_index:
summand = self.base_index[-1] # pylint: disable=unsubscriptable-object
# print "Adding ", summand
self.total += summand
[docs] def branch_process(self, subinstance):
# print "Adding subinstance ", subinstance.total
self.total += subinstance.total
[docs]class ITRBadder2(ITRBranch):
[docs] def start_process(self, index): # pylint: disable=unused-argument
self.total = 0
[docs] def end_process(self):
# print "Adding ", self.base_index
self.total += reduce(lambda x, y: x + y, self.base_index, 0)
[docs] def can_fast_process(self, index):
if len(index) == 3:
return 1
else:
return None
[docs] def fast_process(self, index):
self.total += index[0] + index[1] + index[2]
[docs] def branch_process(self, subinstance):
# print "Adding branch ", subinstance.total
self.total += subinstance.total
[docs]class TreeReducerTest(UnitTestCase):
[docs] def setUp(self):
super(TreeReducerTest, self).setUp()
self.i1 = [(), (1,), (2,), (3,)]
self.i2 = [(0,), (0, 1), (0, 1, 0), (0, 1, 1), (0, 2), (0, 2, 1), (0, 3)]
self.i1a = [(), (1,)]
self.i1b = [(2,), (3,)]
self.i2a = [(0,), (0, 1), (0, 1, 0)]
self.i2b = [(0, 1, 1), (0, 2)]
self.i2c = [(0, 2, 1), (0, 3)]
[docs] def testTreeReducer(self):
u"""testing IterTreeReducer"""
itm = IterTreeReducer(ITRBadder, [])
for index in self.i1:
val = itm(index)
assert val, (val, index)
itm.Finish()
assert itm.root_branch.total == 6, itm.root_branch.total
itm2 = IterTreeReducer(ITRBadder2, [])
for index in self.i2:
val = itm2(index)
if index == ():
assert not val
else:
assert val
itm2.Finish()
assert itm2.root_branch.total == 12, itm2.root_branch.total
[docs] def testTreeReducerState(self):
u"""Test saving and recreation of an IterTreeReducer"""
itm1a = IterTreeReducer(ITRBadder, [])
for index in self.i1a:
val = itm1a(index)
assert val, index
itm1b = pickle.loads(pickle.dumps(itm1a))
for index in self.i1b:
val = itm1b(index)
assert val, index
itm1b.Finish()
assert itm1b.root_branch.total == 6, itm1b.root_branch.total
itm2a = IterTreeReducer(ITRBadder2, [])
for index in self.i2a:
val = itm2a(index)
if index == ():
assert not val
else:
assert val
itm2b = pickle.loads(pickle.dumps(itm2a))
for index in self.i2b:
val = itm2b(index)
if index == ():
assert not val
else:
assert val
itm2c = pickle.loads(pickle.dumps(itm2b))
for index in self.i2c:
val = itm2c(index)
if index == ():
assert not val
else:
assert val
itm2c.Finish()
assert itm2c.root_branch.total == 12, itm2c.root_branch.total
if __name__ == u"__main__":
unittest.main()