Source code for testing.unit.test_lazy

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

# pylint: disable=no-value-for-parameter

from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import next
from builtins import range

import unittest
import pickle
import sys
from functools import reduce

from duplicity.lazy import *  # pylint: disable=unused-wildcard-import,redefined-builtin
from . import UnitTestCase


[docs]class Iterators(UnitTestCase): one_to_100 = lambda s: iter(list(range(1, 101))) evens = lambda s: iter(list(range(2, 101, 2))) odds = lambda s: iter(list(range(1, 100, 2))) empty = lambda s: iter([])
[docs] def __init__(self, *args): super(Iterators, self).__init__(*args) self.falseerror = self.falseerror_maker() self.trueerror = self.trueerror_maker() self.emptygen = self.emptygen_maker() self.typeerror = self.typeerror_maker() self.nameerror = self.nameerror_maker()
[docs] def falseerror_maker(self): yield None yield 0 yield [] raise Exception
[docs] def trueerror_maker(self): yield 1 yield u"hello" yield (2, 3) raise Exception
[docs] def nameerror_maker(self): if 0: yield 1 raise NameError
[docs] def typeerror_maker(self): yield 1 yield 2 raise TypeError
[docs] def alwayserror(self, x): raise Exception
[docs] def emptygen_maker(self): if 0: yield 1
[docs]class IterEqualTestCase(Iterators): u"""Tests for iter_equal function"""
[docs] def testEmpty(self): u"""Empty iterators should be equal""" assert Iter.equal(self.empty(), iter([]))
[docs] def testNormal(self): u"""See if normal iterators are equal""" assert Iter.equal(iter((1, 2, 3)), iter((1, 2, 3))) assert Iter.equal(self.odds(), iter(list(range(1, 100, 2)))) assert Iter.equal(iter((1, 2, 3)), iter(list(range(1, 4))))
[docs] def testNormalInequality(self): u"""See if normal unequals work""" assert not Iter.equal(iter((1, 2, 3)), iter((1, 2, 4))) assert not Iter.equal(self.odds(), iter([u"hello", u"there"]))
[docs] def testGenerators(self): u"""equals works for generators""" def f(): yield 1 yield u"hello" def g(): yield 1 yield u"hello" assert Iter.equal(f(), g())
[docs] def testLength(self): u"""Differently sized iterators""" assert not Iter.equal(iter((1, 2, 3)), iter((1, 2))) assert not Iter.equal(iter((1, 2)), iter((1, 2, 3)))
[docs]class FilterTestCase(Iterators): u"""Tests for lazy_filter function"""
[docs] def testEmpty(self): u"""empty iterators -> empty iterators""" assert Iter.empty(Iter.filter(self.alwayserror, self.empty())), \ u"Filtering an empty iterator should result in empty iterator"
[docs] def testNum1(self): u"""Test numbers 1 - 100 #1""" assert Iter.equal(Iter.filter(lambda x: x % 2 == 0, self.one_to_100()), self.evens()) assert Iter.equal(Iter.filter(lambda x: x % 2, self.one_to_100()), self.odds())
[docs] def testError(self): u"""Should raise appropriate error""" i = Iter.filter(lambda x: x, self.falseerror_maker()) if sys.version_info.major >= 3: self.assertRaises(Exception, i.__next__) else: self.assertRaises(Exception, i.next)
[docs]class MapTestCase(Iterators): u"""Test mapping of iterators"""
[docs] def testNumbers(self): u"""1 to 100 * 2 = 2 to 200""" assert Iter.equal(Iter.map(lambda x: 2 * x, self.one_to_100()), iter(list(range(2, 201, 2))))
[docs] def testShortcut(self): u"""Map should go in order""" def f(x): if x == u"hello": raise NameError i = Iter.map(f, self.trueerror_maker()) next(i) if sys.version_info.major >= 3: self.assertRaises(NameError, i.__next__) else: self.assertRaises(NameError, i.next)
[docs] def testEmpty(self): u"""Map of an empty iterator is empty""" assert Iter.empty(Iter.map(lambda x: x, iter([])))
[docs]class CatTestCase(Iterators): u"""Test concatenation of iterators"""
[docs] def testEmpty(self): u"""Empty + empty = empty""" assert Iter.empty(Iter.cat(iter([]), iter([])))
[docs] def testNumbers(self): u"""1 to 50 + 51 to 100 = 1 to 100""" assert Iter.equal(Iter.cat(iter(list(range(1, 51))), iter(list(range(51, 101)))), self.one_to_100())
[docs] def testShortcut(self): u"""Process iterators in order""" i = Iter.cat(self.typeerror_maker(), self.nameerror_maker()) next(i) next(i) if sys.version_info.major >= 3: self.assertRaises(TypeError, i.__next__) else: self.assertRaises(TypeError, i.next)
[docs]class AndOrTestCase(Iterators): u"""Test And and Or"""
[docs] def testEmpty(self): u"""And() -> true, Or() -> false""" assert Iter.And(self.empty()) assert not Iter.Or(self.empty())
[docs] def testAndShortcut(self): u"""And should return if any false""" assert Iter.And(self.falseerror_maker()) is None
[docs] def testOrShortcut(self): u"""Or should return if any true""" assert Iter.Or(self.trueerror_maker()) == 1
[docs] def testNormalAnd(self): u"""And should go through true iterators, picking last""" assert Iter.And(iter([1, 2, 3, 4])) == 4 self.assertRaises(Exception, Iter.And, self.trueerror_maker())
[docs] def testNormalOr(self): u"""Or goes through false iterators, picking last""" assert Iter.Or(iter([0, None, []])) == [] self.assertRaises(Exception, Iter.Or, self.falseerror_maker())
[docs]class FoldingTest(Iterators): u"""Test folding operations"""
[docs] def f(self, x, y): return x + y
[docs] def testEmpty(self): u"""Folds of empty iterators should produce defaults""" assert Iter.foldl(self.f, 23, self.empty()) == 23 assert Iter.foldr(self.f, 32, self.empty()) == 32
[docs] def testAddition(self): u"""Use folds to sum lists""" assert Iter.foldl(self.f, 0, self.one_to_100()) == 5050 assert Iter.foldr(self.f, 0, self.one_to_100()) == 5050
[docs] def testLargeAddition(self): u"""Folds on 10000 element iterators""" assert Iter.foldl(self.f, 0, iter(list(range(1, 10001)))) == 50005000 self.assertRaises(RuntimeError, Iter.foldr, self.f, 0, iter(list(range(1, 10001))))
[docs] def testLen(self): u"""Use folds to calculate length of lists""" assert Iter.foldl(lambda x, y: x + 1, 0, self.evens()) == 50 assert Iter.foldr(lambda x, y: y + 1, 0, self.odds()) == 50
[docs]class MultiplexTest(Iterators):
[docs] def testSingle(self): u"""Test multiplex single stream""" i_orig = self.one_to_100() i2_orig = self.one_to_100() i = Iter.multiplex(i_orig, 1)[0] assert Iter.equal(i, i2_orig)
[docs] def testTrible(self): u"""Test splitting iterator into three""" counter = [0] def ff(x): # pylint: disable=unused-argument counter[0] += 1 i_orig = self.one_to_100() i2_orig = self.one_to_100() i1, i2, i3 = Iter.multiplex(i_orig, 3, ff) assert Iter.equal(i1, i2) assert Iter.equal(i3, i2_orig) assert counter[0] == 100, counter
[docs] def testDouble(self): u"""Test splitting into two...""" i1, i2 = Iter.multiplex(self.one_to_100(), 2) assert Iter.equal(i1, self.one_to_100()) assert Iter.equal(i2, self.one_to_100())
[docs]class ITRBadder(ITRBranch):
[docs] def start_process(self, index): # pylint: disable=unused-argument self.total = 0
[docs] def end_process(self): if self.base_index: summand = self.base_index[-1] # pylint: disable=unsubscriptable-object # print "Adding ", summand self.total += summand
[docs] def branch_process(self, subinstance): # print "Adding subinstance ", subinstance.total self.total += subinstance.total
[docs]class ITRBadder2(ITRBranch):
[docs] def start_process(self, index): # pylint: disable=unused-argument self.total = 0
[docs] def end_process(self): # print "Adding ", self.base_index self.total += reduce(lambda x, y: x + y, self.base_index, 0)
[docs] def can_fast_process(self, index): if len(index) == 3: return 1 else: return None
[docs] def fast_process(self, index): self.total += index[0] + index[1] + index[2]
[docs] def branch_process(self, subinstance): # print "Adding branch ", subinstance.total self.total += subinstance.total
[docs]class TreeReducerTest(UnitTestCase):
[docs] def setUp(self): super(TreeReducerTest, self).setUp() self.i1 = [(), (1,), (2,), (3,)] self.i2 = [(0,), (0, 1), (0, 1, 0), (0, 1, 1), (0, 2), (0, 2, 1), (0, 3)] self.i1a = [(), (1,)] self.i1b = [(2,), (3,)] self.i2a = [(0,), (0, 1), (0, 1, 0)] self.i2b = [(0, 1, 1), (0, 2)] self.i2c = [(0, 2, 1), (0, 3)]
[docs] def testTreeReducer(self): u"""testing IterTreeReducer""" itm = IterTreeReducer(ITRBadder, []) for index in self.i1: val = itm(index) assert val, (val, index) itm.Finish() assert itm.root_branch.total == 6, itm.root_branch.total itm2 = IterTreeReducer(ITRBadder2, []) for index in self.i2: val = itm2(index) if index == (): assert not val else: assert val itm2.Finish() assert itm2.root_branch.total == 12, itm2.root_branch.total
[docs] def testTreeReducerState(self): u"""Test saving and recreation of an IterTreeReducer""" itm1a = IterTreeReducer(ITRBadder, []) for index in self.i1a: val = itm1a(index) assert val, index itm1b = pickle.loads(pickle.dumps(itm1a)) for index in self.i1b: val = itm1b(index) assert val, index itm1b.Finish() assert itm1b.root_branch.total == 6, itm1b.root_branch.total itm2a = IterTreeReducer(ITRBadder2, []) for index in self.i2a: val = itm2a(index) if index == (): assert not val else: assert val itm2b = pickle.loads(pickle.dumps(itm2a)) for index in self.i2b: val = itm2b(index) if index == (): assert not val else: assert val itm2c = pickle.loads(pickle.dumps(itm2b)) for index in self.i2c: val = itm2c(index) if index == (): assert not val else: assert val itm2c.Finish() assert itm2c.root_branch.total == 12, itm2c.root_branch.total
if __name__ == u"__main__": unittest.main()