698
698
instrumented_graph.is_ancestor('rev2a', 'rev2b')
699
699
self.assertTrue('null:' not in instrumented_provider.calls)
701
def test_is_between(self):
702
graph = self.make_graph(ancestry_1)
703
self.assertEqual(True, graph.is_between('null:', 'null:', 'null:'))
704
self.assertEqual(True, graph.is_between('rev1', 'null:', 'rev1'))
705
self.assertEqual(True, graph.is_between('rev1', 'rev1', 'rev4'))
706
self.assertEqual(True, graph.is_between('rev4', 'rev1', 'rev4'))
707
self.assertEqual(True, graph.is_between('rev3', 'rev1', 'rev4'))
708
self.assertEqual(False, graph.is_between('rev4', 'rev1', 'rev3'))
709
self.assertEqual(False, graph.is_between('rev1', 'rev2a', 'rev4'))
710
self.assertEqual(False, graph.is_between('null:', 'rev1', 'rev4'))
712
701
def test_is_ancestor_boundary(self):
713
702
"""Ensure that we avoid searching the whole graph.
1093
1082
search = graph._make_breadth_first_searcher(['head'])
1094
1083
self.assertSeenAndResult(expected, search, search.next_with_ghosts)
1096
def test_breadth_first_stop_searching_late(self):
1097
# A client should be able to say 'stop node X' and have it excluded
1098
# from the result even if X was seen in an older iteration of the
1100
graph = self.make_graph({
1103
'child':[NULL_REVISION],
1106
search = graph._make_breadth_first_searcher(['head'])
1108
(set(['head']), (set(['head']), set(['middle']), 1),
1109
['head'], None, None),
1110
(set(['head', 'middle']), (set(['head']), set(['child']), 2),
1111
['head', 'middle'], None, None),
1112
# 'middle' came from the previous iteration, but we don't stop
1113
# searching it until *after* advancing the searcher.
1114
(set(['head', 'middle', 'child']),
1115
(set(['head']), set(['middle', 'child']), 1),
1116
['head'], None, ['middle', 'child']),
1118
self.assertSeenAndResult(expected, search, search.next)
1119
# using next_with_ghosts:
1120
search = graph._make_breadth_first_searcher(['head'])
1121
self.assertSeenAndResult(expected, search, search.next_with_ghosts)
1123
1085
def test_breadth_first_get_result_ghosts_are_excluded(self):
1124
1086
graph = self.make_graph({
1125
1087
'head':['child', 'ghost'],
1422
1384
# Use sorted because we don't care about the order, just that each is
1423
1385
# only present 1 time.
1424
1386
self.assertEqual(['a', 'b'], sorted(self.inst_pp.calls))
1427
class TestCachingParentsProviderExtras(tests.TestCaseWithTransport):
1428
"""Test the behaviour when parents are provided that were not requested."""
1431
super(TestCachingParentsProviderExtras, self).setUp()
1432
class ExtraParentsProvider(object):
1434
def get_parent_map(self, keys):
1435
return {'rev1': [], 'rev2': ['rev1',]}
1437
self.inst_pp = InstrumentedParentsProvider(ExtraParentsProvider())
1438
self.caching_pp = _mod_graph.CachingParentsProvider(
1439
get_parent_map=self.inst_pp.get_parent_map)
1441
def test_uncached(self):
1442
self.caching_pp.disable_cache()
1443
self.assertEqual({'rev1': []},
1444
self.caching_pp.get_parent_map(['rev1']))
1445
self.assertEqual(['rev1'], self.inst_pp.calls)
1446
self.assertIs(None, self.caching_pp._cache)
1448
def test_cache_initially_empty(self):
1449
self.assertEqual({}, self.caching_pp._cache)
1451
def test_cached(self):
1452
self.assertEqual({'rev1': []},
1453
self.caching_pp.get_parent_map(['rev1']))
1454
self.assertEqual(['rev1'], self.inst_pp.calls)
1455
self.assertEqual({'rev1': [], 'rev2': ['rev1']},
1456
self.caching_pp._cache)
1457
self.assertEqual({'rev1': []},
1458
self.caching_pp.get_parent_map(['rev1']))
1459
self.assertEqual(['rev1'], self.inst_pp.calls)
1461
def test_disable_cache_clears_cache(self):
1462
# Put something in the cache
1463
self.caching_pp.get_parent_map(['rev1'])
1464
self.assertEqual(2, len(self.caching_pp._cache))
1465
self.caching_pp.disable_cache()
1466
self.assertIs(None, self.caching_pp._cache)
1468
def test_enable_cache_raises(self):
1469
e = self.assertRaises(AssertionError, self.caching_pp.enable_cache)
1470
self.assertEqual('Cache enabled when already enabled.', str(e))
1472
def test_cache_misses(self):
1473
self.caching_pp.get_parent_map(['rev3'])
1474
self.caching_pp.get_parent_map(['rev3'])
1475
self.assertEqual(['rev3'], self.inst_pp.calls)
1477
def test_no_cache_misses(self):
1478
self.caching_pp.disable_cache()
1479
self.caching_pp.enable_cache(cache_misses=False)
1480
self.caching_pp.get_parent_map(['rev3'])
1481
self.caching_pp.get_parent_map(['rev3'])
1482
self.assertEqual(['rev3', 'rev3'], self.inst_pp.calls)
1484
def test_cache_extras(self):
1485
self.assertEqual({}, self.caching_pp.get_parent_map(['rev3']))
1486
self.assertEqual({'rev2': ['rev1']},
1487
self.caching_pp.get_parent_map(['rev2']))
1488
self.assertEqual(['rev3'], self.inst_pp.calls)
1491
class TestCollapseLinearRegions(tests.TestCase):
1493
def assertCollapsed(self, collapsed, original):
1494
self.assertEqual(collapsed,
1495
_mod_graph.collapse_linear_regions(original))
1497
def test_collapse_nothing(self):
1498
d = {1:[2, 3], 2:[], 3:[]}
1499
self.assertCollapsed(d, d)
1500
d = {1:[2], 2:[3, 4], 3:[5], 4:[5], 5:[]}
1501
self.assertCollapsed(d, d)
1503
def test_collapse_chain(self):
1504
# Any time we have a linear chain, we should be able to collapse
1505
d = {1:[2], 2:[3], 3:[4], 4:[5], 5:[]}
1506
self.assertCollapsed({1:[5], 5:[]}, d)
1507
d = {5:[4], 4:[3], 3:[2], 2:[1], 1:[]}
1508
self.assertCollapsed({5:[1], 1:[]}, d)
1509
d = {5:[3], 3:[4], 4:[1], 1:[2], 2:[]}
1510
self.assertCollapsed({5:[2], 2:[]}, d)
1512
def test_collapse_with_multiple_children(self):
1523
# 4 and 5 cannot be removed because 6 has 2 children
1524
# 2 and 3 cannot be removed because 1 has 2 parents
1525
d = {1:[2, 3], 2:[4], 4:[6], 3:[5], 5:[6], 6:[7], 7:[]}
1526
self.assertCollapsed(d, d)