1
# Copyright (C) 2007-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the compiled dirstate helpers."""
29
from bzrlib.tests import (
32
from bzrlib.tests.test_osutils import dir_reader_scenarios
33
from bzrlib.tests.scenarios import (
34
load_tests_apply_scenarios,
37
from bzrlib.tests import (
42
load_tests = load_tests_apply_scenarios
45
compiled_dirstate_helpers_feature = features.ModuleAvailableFeature(
46
'bzrlib._dirstate_helpers_pyx')
49
# FIXME: we should also parametrize against SHA1Provider !
51
ue_scenarios = [('dirstate_Python',
52
{'update_entry': dirstate.py_update_entry})]
53
if compiled_dirstate_helpers_feature.available():
54
update_entry = compiled_dirstate_helpers_feature.module.update_entry
55
ue_scenarios.append(('dirstate_Pyrex', {'update_entry': update_entry}))
57
pe_scenarios = [('dirstate_Python',
58
{'_process_entry': dirstate.ProcessEntryPython})]
59
if compiled_dirstate_helpers_feature.available():
60
process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
61
pe_scenarios.append(('dirstate_Pyrex', {'_process_entry': process_entry}))
64
class TestBisectPathMixin(object):
65
"""Test that _bisect_path_*() returns the expected values.
67
_bisect_path_* is intended to work like bisect.bisect_*() except it
68
knows it is working on paths that are sorted by ('path', 'to', 'foo')
69
chunks rather than by raw 'path/to/foo'.
71
Test Cases should inherit from this and override ``get_bisect_path`` return
72
their implementation, and ``get_bisect`` to return the matching
73
bisect.bisect_* function.
76
def get_bisect_path(self):
77
"""Return an implementation of _bisect_path_*"""
78
raise NotImplementedError
81
"""Return a version of bisect.bisect_*.
83
Also, for the 'exists' check, return the offset to the real values.
84
For example bisect_left returns the index of an entry, while
85
bisect_right returns the index *after* an entry
87
:return: (bisect_func, offset)
89
raise NotImplementedError
91
def assertBisect(self, paths, split_paths, path, exists=True):
92
"""Assert that bisect_split works like bisect_left on the split paths.
94
:param paths: A list of path names
95
:param split_paths: A list of path names that are already split up by directory
96
('path/to/foo' => ('path', 'to', 'foo'))
97
:param path: The path we are indexing.
98
:param exists: The path should be present, so make sure the
99
final location actually points to the right value.
101
All other arguments will be passed along.
103
bisect_path = self.get_bisect_path()
104
self.assertIsInstance(paths, list)
105
bisect_path_idx = bisect_path(paths, path)
106
split_path = self.split_for_dirblocks([path])[0]
107
bisect_func, offset = self.get_bisect()
108
bisect_split_idx = bisect_func(split_paths, split_path)
109
self.assertEqual(bisect_split_idx, bisect_path_idx,
110
'%s disagreed. %s != %s'
112
% (bisect_path.__name__,
113
bisect_split_idx, bisect_path_idx, path)
116
self.assertEqual(path, paths[bisect_path_idx+offset])
118
def split_for_dirblocks(self, paths):
121
dirname, basename = os.path.split(path)
122
dir_split_paths.append((dirname.split('/'), basename))
123
dir_split_paths.sort()
124
return dir_split_paths
126
def test_simple(self):
127
"""In the simple case it works just like bisect_left"""
128
paths = ['', 'a', 'b', 'c', 'd']
129
split_paths = self.split_for_dirblocks(paths)
131
self.assertBisect(paths, split_paths, path, exists=True)
132
self.assertBisect(paths, split_paths, '_', exists=False)
133
self.assertBisect(paths, split_paths, 'aa', exists=False)
134
self.assertBisect(paths, split_paths, 'bb', exists=False)
135
self.assertBisect(paths, split_paths, 'cc', exists=False)
136
self.assertBisect(paths, split_paths, 'dd', exists=False)
137
self.assertBisect(paths, split_paths, 'a/a', exists=False)
138
self.assertBisect(paths, split_paths, 'b/b', exists=False)
139
self.assertBisect(paths, split_paths, 'c/c', exists=False)
140
self.assertBisect(paths, split_paths, 'd/d', exists=False)
142
def test_involved(self):
143
"""This is where bisect_path_* diverges slightly."""
144
# This is the list of paths and their contents
172
# This is the exact order that is stored by dirstate
173
# All children in a directory are mentioned before an children of
174
# children are mentioned.
175
# So all the root-directory paths, then all the
176
# first sub directory, etc.
177
paths = [# content of '/'
178
'', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
180
'a/a', 'a/a-a', 'a/a-z',
182
'a/z', 'a/z-a', 'a/z-z',
205
split_paths = self.split_for_dirblocks(paths)
207
for dir_parts, basename in split_paths:
208
if dir_parts == ['']:
209
sorted_paths.append(basename)
211
sorted_paths.append('/'.join(dir_parts + [basename]))
213
self.assertEqual(sorted_paths, paths)
216
self.assertBisect(paths, split_paths, path, exists=True)
219
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
220
"""Run all Bisect Path tests against _bisect_path_left."""
222
def get_bisect_path(self):
223
from bzrlib._dirstate_helpers_py import _bisect_path_left
224
return _bisect_path_left
226
def get_bisect(self):
227
return bisect.bisect_left, 0
230
class TestCompiledBisectPathLeft(TestBisectPathLeft):
231
"""Run all Bisect Path tests against _bisect_path_lect"""
233
_test_needs_features = [compiled_dirstate_helpers_feature]
235
def get_bisect_path(self):
236
from bzrlib._dirstate_helpers_pyx import _bisect_path_left
237
return _bisect_path_left
240
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
241
"""Run all Bisect Path tests against _bisect_path_right"""
243
def get_bisect_path(self):
244
from bzrlib._dirstate_helpers_py import _bisect_path_right
245
return _bisect_path_right
247
def get_bisect(self):
248
return bisect.bisect_right, -1
251
class TestCompiledBisectPathRight(TestBisectPathRight):
252
"""Run all Bisect Path tests against _bisect_path_right"""
254
_test_needs_features = [compiled_dirstate_helpers_feature]
256
def get_bisect_path(self):
257
from bzrlib._dirstate_helpers_pyx import _bisect_path_right
258
return _bisect_path_right
261
class TestBisectDirblock(tests.TestCase):
262
"""Test that bisect_dirblock() returns the expected values.
264
bisect_dirblock is intended to work like bisect.bisect_left() except it
265
knows it is working on dirblocks and that dirblocks are sorted by ('path',
266
'to', 'foo') chunks rather than by raw 'path/to/foo'.
268
This test is parameterized by calling get_bisect_dirblock(). Child test
269
cases can override this function to test against a different
273
def get_bisect_dirblock(self):
274
"""Return an implementation of bisect_dirblock"""
275
from bzrlib._dirstate_helpers_py import bisect_dirblock
276
return bisect_dirblock
278
def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
279
"""Assert that bisect_split works like bisect_left on the split paths.
281
:param dirblocks: A list of (path, [info]) pairs.
282
:param split_dirblocks: A list of ((split, path), [info]) pairs.
283
:param path: The path we are indexing.
285
All other arguments will be passed along.
287
bisect_dirblock = self.get_bisect_dirblock()
288
self.assertIsInstance(dirblocks, list)
289
bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
290
split_dirblock = (path.split('/'), [])
291
bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
293
self.assertEqual(bisect_left_idx, bisect_split_idx,
294
'bisect_split disagreed. %s != %s'
296
% (bisect_left_idx, bisect_split_idx, path)
299
def paths_to_dirblocks(self, paths):
300
"""Convert a list of paths into dirblock form.
302
Also, ensure that the paths are in proper sorted order.
304
dirblocks = [(path, []) for path in paths]
305
split_dirblocks = [(path.split('/'), []) for path in paths]
306
self.assertEqual(sorted(split_dirblocks), split_dirblocks)
307
return dirblocks, split_dirblocks
309
def test_simple(self):
310
"""In the simple case it works just like bisect_left"""
311
paths = ['', 'a', 'b', 'c', 'd']
312
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
314
self.assertBisect(dirblocks, split_dirblocks, path)
315
self.assertBisect(dirblocks, split_dirblocks, '_')
316
self.assertBisect(dirblocks, split_dirblocks, 'aa')
317
self.assertBisect(dirblocks, split_dirblocks, 'bb')
318
self.assertBisect(dirblocks, split_dirblocks, 'cc')
319
self.assertBisect(dirblocks, split_dirblocks, 'dd')
320
self.assertBisect(dirblocks, split_dirblocks, 'a/a')
321
self.assertBisect(dirblocks, split_dirblocks, 'b/b')
322
self.assertBisect(dirblocks, split_dirblocks, 'c/c')
323
self.assertBisect(dirblocks, split_dirblocks, 'd/d')
325
def test_involved(self):
326
"""This is where bisect_left diverges slightly."""
328
'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
329
'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
331
'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
332
'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
335
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
337
self.assertBisect(dirblocks, split_dirblocks, path)
339
def test_involved_cached(self):
340
"""This is where bisect_left diverges slightly."""
342
'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
343
'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
345
'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
346
'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
350
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
352
self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
355
class TestCompiledBisectDirblock(TestBisectDirblock):
356
"""Test that bisect_dirblock() returns the expected values.
358
bisect_dirblock is intended to work like bisect.bisect_left() except it
359
knows it is working on dirblocks and that dirblocks are sorted by ('path',
360
'to', 'foo') chunks rather than by raw 'path/to/foo'.
362
This runs all the normal tests that TestBisectDirblock did, but uses the
366
_test_needs_features = [compiled_dirstate_helpers_feature]
368
def get_bisect_dirblock(self):
369
from bzrlib._dirstate_helpers_pyx import bisect_dirblock
370
return bisect_dirblock
373
class TestCmpByDirs(tests.TestCase):
374
"""Test an implementation of cmp_by_dirs()
376
cmp_by_dirs() compares 2 paths by their directory sections, rather than as
379
Child test cases can override ``get_cmp_by_dirs`` to test a specific
383
def get_cmp_by_dirs(self):
384
"""Get a specific implementation of cmp_by_dirs."""
385
from bzrlib._dirstate_helpers_py import cmp_by_dirs
388
def assertCmpByDirs(self, expected, str1, str2):
389
"""Compare the two strings, in both directions.
391
:param expected: The expected comparison value. -1 means str1 comes
392
first, 0 means they are equal, 1 means str2 comes first
393
:param str1: string to compare
394
:param str2: string to compare
396
cmp_by_dirs = self.get_cmp_by_dirs()
398
self.assertEqual(str1, str2)
399
self.assertEqual(0, cmp_by_dirs(str1, str2))
400
self.assertEqual(0, cmp_by_dirs(str2, str1))
402
self.assertPositive(cmp_by_dirs(str1, str2))
403
self.assertNegative(cmp_by_dirs(str2, str1))
405
self.assertNegative(cmp_by_dirs(str1, str2))
406
self.assertPositive(cmp_by_dirs(str2, str1))
408
def test_cmp_empty(self):
409
"""Compare against the empty string."""
410
self.assertCmpByDirs(0, '', '')
411
self.assertCmpByDirs(1, 'a', '')
412
self.assertCmpByDirs(1, 'ab', '')
413
self.assertCmpByDirs(1, 'abc', '')
414
self.assertCmpByDirs(1, 'abcd', '')
415
self.assertCmpByDirs(1, 'abcde', '')
416
self.assertCmpByDirs(1, 'abcdef', '')
417
self.assertCmpByDirs(1, 'abcdefg', '')
418
self.assertCmpByDirs(1, 'abcdefgh', '')
419
self.assertCmpByDirs(1, 'abcdefghi', '')
420
self.assertCmpByDirs(1, 'test/ing/a/path/', '')
422
def test_cmp_same_str(self):
423
"""Compare the same string"""
424
self.assertCmpByDirs(0, 'a', 'a')
425
self.assertCmpByDirs(0, 'ab', 'ab')
426
self.assertCmpByDirs(0, 'abc', 'abc')
427
self.assertCmpByDirs(0, 'abcd', 'abcd')
428
self.assertCmpByDirs(0, 'abcde', 'abcde')
429
self.assertCmpByDirs(0, 'abcdef', 'abcdef')
430
self.assertCmpByDirs(0, 'abcdefg', 'abcdefg')
431
self.assertCmpByDirs(0, 'abcdefgh', 'abcdefgh')
432
self.assertCmpByDirs(0, 'abcdefghi', 'abcdefghi')
433
self.assertCmpByDirs(0, 'testing a long string', 'testing a long string')
434
self.assertCmpByDirs(0, 'x'*10000, 'x'*10000)
435
self.assertCmpByDirs(0, 'a/b', 'a/b')
436
self.assertCmpByDirs(0, 'a/b/c', 'a/b/c')
437
self.assertCmpByDirs(0, 'a/b/c/d', 'a/b/c/d')
438
self.assertCmpByDirs(0, 'a/b/c/d/e', 'a/b/c/d/e')
440
def test_simple_paths(self):
441
"""Compare strings that act like normal string comparison"""
442
self.assertCmpByDirs(-1, 'a', 'b')
443
self.assertCmpByDirs(-1, 'aa', 'ab')
444
self.assertCmpByDirs(-1, 'ab', 'bb')
445
self.assertCmpByDirs(-1, 'aaa', 'aab')
446
self.assertCmpByDirs(-1, 'aab', 'abb')
447
self.assertCmpByDirs(-1, 'abb', 'bbb')
448
self.assertCmpByDirs(-1, 'aaaa', 'aaab')
449
self.assertCmpByDirs(-1, 'aaab', 'aabb')
450
self.assertCmpByDirs(-1, 'aabb', 'abbb')
451
self.assertCmpByDirs(-1, 'abbb', 'bbbb')
452
self.assertCmpByDirs(-1, 'aaaaa', 'aaaab')
453
self.assertCmpByDirs(-1, 'a/a', 'a/b')
454
self.assertCmpByDirs(-1, 'a/b', 'b/b')
455
self.assertCmpByDirs(-1, 'a/a/a', 'a/a/b')
456
self.assertCmpByDirs(-1, 'a/a/b', 'a/b/b')
457
self.assertCmpByDirs(-1, 'a/b/b', 'b/b/b')
458
self.assertCmpByDirs(-1, 'a/a/a/a', 'a/a/a/b')
459
self.assertCmpByDirs(-1, 'a/a/a/b', 'a/a/b/b')
460
self.assertCmpByDirs(-1, 'a/a/b/b', 'a/b/b/b')
461
self.assertCmpByDirs(-1, 'a/b/b/b', 'b/b/b/b')
462
self.assertCmpByDirs(-1, 'a/a/a/a/a', 'a/a/a/a/b')
464
def test_tricky_paths(self):
465
self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/cc/ef')
466
self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/c/ef')
467
self.assertCmpByDirs(-1, 'ab/cd/ef', 'ab/cd-ef')
468
self.assertCmpByDirs(-1, 'ab/cd', 'ab/cd-')
469
self.assertCmpByDirs(-1, 'ab/cd', 'ab-cd')
471
def test_cmp_unicode_not_allowed(self):
472
cmp_by_dirs = self.get_cmp_by_dirs()
473
self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', 'str')
474
self.assertRaises(TypeError, cmp_by_dirs, 'str', u'Unicode')
475
self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', u'Unicode')
477
def test_cmp_non_ascii(self):
478
self.assertCmpByDirs(-1, '\xc2\xb5', '\xc3\xa5') # u'\xb5', u'\xe5'
479
self.assertCmpByDirs(-1, 'a', '\xc3\xa5') # u'a', u'\xe5'
480
self.assertCmpByDirs(-1, 'b', '\xc2\xb5') # u'b', u'\xb5'
481
self.assertCmpByDirs(-1, 'a/b', 'a/\xc3\xa5') # u'a/b', u'a/\xe5'
482
self.assertCmpByDirs(-1, 'b/a', 'b/\xc2\xb5') # u'b/a', u'b/\xb5'
485
class TestCompiledCmpByDirs(TestCmpByDirs):
486
"""Test the pyrex implementation of cmp_by_dirs"""
488
_test_needs_features = [compiled_dirstate_helpers_feature]
490
def get_cmp_by_dirs(self):
491
from bzrlib._dirstate_helpers_pyx import cmp_by_dirs
495
class TestCmpPathByDirblock(tests.TestCase):
496
"""Test an implementation of _cmp_path_by_dirblock()
498
_cmp_path_by_dirblock() compares two paths using the sort order used by
499
DirState. All paths in the same directory are sorted together.
501
Child test cases can override ``get_cmp_path_by_dirblock`` to test a specific
505
def get_cmp_path_by_dirblock(self):
506
"""Get a specific implementation of _cmp_path_by_dirblock."""
507
from bzrlib._dirstate_helpers_py import _cmp_path_by_dirblock
508
return _cmp_path_by_dirblock
510
def assertCmpPathByDirblock(self, paths):
511
"""Compare all paths and make sure they evaluate to the correct order.
513
This does N^2 comparisons. It is assumed that ``paths`` is properly
516
:param paths: a sorted list of paths to compare
518
# First, make sure the paths being passed in are correct
520
dirname, basename = os.path.split(p)
521
return dirname.split('/'), basename
522
self.assertEqual(sorted(paths, key=_key), paths)
524
cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
525
for idx1, path1 in enumerate(paths):
526
for idx2, path2 in enumerate(paths):
527
cmp_val = cmp_path_by_dirblock(path1, path2)
529
self.assertTrue(cmp_val < 0,
530
'%s did not state that %r came before %r, cmp=%s'
531
% (cmp_path_by_dirblock.__name__,
532
path1, path2, cmp_val))
534
self.assertTrue(cmp_val > 0,
535
'%s did not state that %r came after %r, cmp=%s'
536
% (cmp_path_by_dirblock.__name__,
537
path1, path2, cmp_val))
539
self.assertTrue(cmp_val == 0,
540
'%s did not state that %r == %r, cmp=%s'
541
% (cmp_path_by_dirblock.__name__,
542
path1, path2, cmp_val))
544
def test_cmp_simple_paths(self):
545
"""Compare against the empty string."""
546
self.assertCmpPathByDirblock(['', 'a', 'ab', 'abc', 'a/b/c', 'b/d/e'])
547
self.assertCmpPathByDirblock(['kl', 'ab/cd', 'ab/ef', 'gh/ij'])
549
def test_tricky_paths(self):
550
self.assertCmpPathByDirblock([
552
'', 'a', 'a-a', 'a=a', 'b',
554
'a/a', 'a/a-a', 'a/a=a', 'a/b',
556
'a/a/a', 'a/a/a-a', 'a/a/a=a',
557
# Contents of 'a/a/a'
558
'a/a/a/a', 'a/a/a/b',
559
# Contents of 'a/a/a-a',
560
'a/a/a-a/a', 'a/a/a-a/b',
561
# Contents of 'a/a/a=a',
562
'a/a/a=a/a', 'a/a/a=a/b',
563
# Contents of 'a/a-a'
565
# Contents of 'a/a-a/a'
566
'a/a-a/a/a', 'a/a-a/a/b',
567
# Contents of 'a/a=a'
578
self.assertCmpPathByDirblock([
580
'', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
582
'a/a', 'a/a-a', 'a/a-z',
584
'a/z', 'a/z-a', 'a/z-z',
608
def test_unicode_not_allowed(self):
609
cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
610
self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', 'str')
611
self.assertRaises(TypeError, cmp_path_by_dirblock, 'str', u'Uni')
612
self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', u'Uni')
613
self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', 'x/str')
614
self.assertRaises(TypeError, cmp_path_by_dirblock, 'x/str', u'x/Uni')
615
self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', u'x/Uni')
617
def test_nonascii(self):
618
self.assertCmpPathByDirblock([
620
'', 'a', '\xc2\xb5', '\xc3\xa5',
622
'a/a', 'a/\xc2\xb5', 'a/\xc3\xa5',
624
'a/a/a', 'a/a/\xc2\xb5', 'a/a/\xc3\xa5',
625
# content of 'a/\xc2\xb5'
626
'a/\xc2\xb5/a', 'a/\xc2\xb5/\xc2\xb5', 'a/\xc2\xb5/\xc3\xa5',
627
# content of 'a/\xc3\xa5'
628
'a/\xc3\xa5/a', 'a/\xc3\xa5/\xc2\xb5', 'a/\xc3\xa5/\xc3\xa5',
629
# content of '\xc2\xb5'
630
'\xc2\xb5/a', '\xc2\xb5/\xc2\xb5', '\xc2\xb5/\xc3\xa5',
631
# content of '\xc2\xe5'
632
'\xc3\xa5/a', '\xc3\xa5/\xc2\xb5', '\xc3\xa5/\xc3\xa5',
636
class TestCompiledCmpPathByDirblock(TestCmpPathByDirblock):
637
"""Test the pyrex implementation of _cmp_path_by_dirblock"""
639
_test_needs_features = [compiled_dirstate_helpers_feature]
641
def get_cmp_by_dirs(self):
642
from bzrlib._dirstate_helpers_pyx import _cmp_path_by_dirblock
643
return _cmp_path_by_dirblock
646
class TestMemRChr(tests.TestCase):
647
"""Test memrchr functionality"""
649
_test_needs_features = [compiled_dirstate_helpers_feature]
651
def assertMemRChr(self, expected, s, c):
652
from bzrlib._dirstate_helpers_pyx import _py_memrchr
653
self.assertEqual(expected, _py_memrchr(s, c))
655
def test_missing(self):
656
self.assertMemRChr(None, '', 'a')
657
self.assertMemRChr(None, '', 'c')
658
self.assertMemRChr(None, 'abcdefghijklm', 'q')
659
self.assertMemRChr(None, 'aaaaaaaaaaaaaaaaaaaaaaa', 'b')
661
def test_single_entry(self):
662
self.assertMemRChr(0, 'abcdefghijklm', 'a')
663
self.assertMemRChr(1, 'abcdefghijklm', 'b')
664
self.assertMemRChr(2, 'abcdefghijklm', 'c')
665
self.assertMemRChr(10, 'abcdefghijklm', 'k')
666
self.assertMemRChr(11, 'abcdefghijklm', 'l')
667
self.assertMemRChr(12, 'abcdefghijklm', 'm')
669
def test_multiple(self):
670
self.assertMemRChr(10, 'abcdefjklmabcdefghijklm', 'a')
671
self.assertMemRChr(11, 'abcdefjklmabcdefghijklm', 'b')
672
self.assertMemRChr(12, 'abcdefjklmabcdefghijklm', 'c')
673
self.assertMemRChr(20, 'abcdefjklmabcdefghijklm', 'k')
674
self.assertMemRChr(21, 'abcdefjklmabcdefghijklm', 'l')
675
self.assertMemRChr(22, 'abcdefjklmabcdefghijklm', 'm')
676
self.assertMemRChr(22, 'aaaaaaaaaaaaaaaaaaaaaaa', 'a')
678
def test_with_nulls(self):
679
self.assertMemRChr(10, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'a')
680
self.assertMemRChr(11, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'b')
681
self.assertMemRChr(12, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'c')
682
self.assertMemRChr(20, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'k')
683
self.assertMemRChr(21, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'l')
684
self.assertMemRChr(22, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'm')
685
self.assertMemRChr(22, 'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', 'a')
686
self.assertMemRChr(9, '\0\0\0\0\0\0\0\0\0\0', '\0')
689
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
690
"""Test an implementation of _read_dirblocks()
692
_read_dirblocks() reads in all of the dirblock information from the disk
695
Child test cases can override ``get_read_dirblocks`` to test a specific
699
# inherits scenarios from test_dirstate
701
def get_read_dirblocks(self):
702
from bzrlib._dirstate_helpers_py import _read_dirblocks
703
return _read_dirblocks
705
def test_smoketest(self):
706
"""Make sure that we can create and read back a simple file."""
707
tree, state, expected = self.create_basic_dirstate()
709
state._read_header_if_needed()
710
self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
711
state._dirblock_state)
712
read_dirblocks = self.get_read_dirblocks()
713
read_dirblocks(state)
714
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
715
state._dirblock_state)
717
def test_trailing_garbage(self):
718
tree, state, expected = self.create_basic_dirstate()
719
# On Unix, we can write extra data as long as we haven't read yet, but
720
# on Win32, if you've opened the file with FILE_SHARE_READ, trying to
721
# open it in append mode will fail.
723
f = open('dirstate', 'ab')
725
# Add bogus trailing garbage
730
e = self.assertRaises(errors.DirstateCorrupt,
731
state._read_dirblocks_if_needed)
732
# Make sure we mention the bogus characters in the error
733
self.assertContainsRe(str(e), 'bogus')
736
class TestCompiledReadDirblocks(TestReadDirblocks):
737
"""Test the pyrex implementation of _read_dirblocks"""
739
_test_needs_features = [compiled_dirstate_helpers_feature]
741
def get_read_dirblocks(self):
742
from bzrlib._dirstate_helpers_pyx import _read_dirblocks
743
return _read_dirblocks
746
class TestUsingCompiledIfAvailable(tests.TestCase):
747
"""Check that any compiled functions that are available are the default.
749
It is possible to have typos, etc in the import line, such that
750
_dirstate_helpers_pyx is actually available, but the compiled functions are
754
def test_bisect_dirblock(self):
755
if compiled_dirstate_helpers_feature.available():
756
from bzrlib._dirstate_helpers_pyx import bisect_dirblock
758
from bzrlib._dirstate_helpers_py import bisect_dirblock
759
self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)
761
def test__bisect_path_left(self):
762
if compiled_dirstate_helpers_feature.available():
763
from bzrlib._dirstate_helpers_pyx import _bisect_path_left
765
from bzrlib._dirstate_helpers_py import _bisect_path_left
766
self.assertIs(_bisect_path_left, dirstate._bisect_path_left)
768
def test__bisect_path_right(self):
769
if compiled_dirstate_helpers_feature.available():
770
from bzrlib._dirstate_helpers_pyx import _bisect_path_right
772
from bzrlib._dirstate_helpers_py import _bisect_path_right
773
self.assertIs(_bisect_path_right, dirstate._bisect_path_right)
775
def test_cmp_by_dirs(self):
776
if compiled_dirstate_helpers_feature.available():
777
from bzrlib._dirstate_helpers_pyx import cmp_by_dirs
779
from bzrlib._dirstate_helpers_py import cmp_by_dirs
780
self.assertIs(cmp_by_dirs, dirstate.cmp_by_dirs)
782
def test__read_dirblocks(self):
783
if compiled_dirstate_helpers_feature.available():
784
from bzrlib._dirstate_helpers_pyx import _read_dirblocks
786
from bzrlib._dirstate_helpers_py import _read_dirblocks
787
self.assertIs(_read_dirblocks, dirstate._read_dirblocks)
789
def test_update_entry(self):
790
if compiled_dirstate_helpers_feature.available():
791
from bzrlib._dirstate_helpers_pyx import update_entry
793
from bzrlib.dirstate import update_entry
794
self.assertIs(update_entry, dirstate.update_entry)
796
def test_process_entry(self):
797
if compiled_dirstate_helpers_feature.available():
798
from bzrlib._dirstate_helpers_pyx import ProcessEntryC
799
self.assertIs(ProcessEntryC, dirstate._process_entry)
801
from bzrlib.dirstate import ProcessEntryPython
802
self.assertIs(ProcessEntryPython, dirstate._process_entry)
805
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
806
"""Test the DirState.update_entry functions"""
808
scenarios = multiply_scenarios(
809
dir_reader_scenarios(), ue_scenarios)
815
super(TestUpdateEntry, self).setUp()
816
self.overrideAttr(dirstate, 'update_entry', self.update_entry)
818
def get_state_with_a(self):
819
"""Create a DirState tracking a single object named 'a'"""
820
state = test_dirstate.InstrumentedDirState.initialize('dirstate')
821
self.addCleanup(state.unlock)
822
state.add('a', 'a-id', 'file', None, '')
823
entry = state._get_entry(0, path_utf8='a')
826
def test_observed_sha1_cachable(self):
827
state, entry = self.get_state_with_a()
829
atime = time.time() - 10
830
self.build_tree(['a'])
831
statvalue = test_dirstate._FakeStat.from_stat(os.lstat('a'))
832
statvalue.st_mtime = statvalue.st_ctime = atime
833
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
834
state._dirblock_state)
835
state._observed_sha1(entry, "foo", statvalue)
836
self.assertEqual('foo', entry[1][0][1])
837
packed_stat = dirstate.pack_stat(statvalue)
838
self.assertEqual(packed_stat, entry[1][0][4])
839
self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
840
state._dirblock_state)
842
def test_observed_sha1_not_cachable(self):
843
state, entry = self.get_state_with_a()
845
oldval = entry[1][0][1]
846
oldstat = entry[1][0][4]
847
self.build_tree(['a'])
848
statvalue = os.lstat('a')
849
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
850
state._dirblock_state)
851
state._observed_sha1(entry, "foo", statvalue)
852
self.assertEqual(oldval, entry[1][0][1])
853
self.assertEqual(oldstat, entry[1][0][4])
854
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
855
state._dirblock_state)
857
def test_update_entry(self):
858
state, _ = self.get_state_with_a()
859
tree = self.make_branch_and_tree('tree')
861
empty_revid = tree.commit('empty')
862
self.build_tree(['tree/a'])
863
tree.add(['a'], ['a-id'])
864
with_a_id = tree.commit('with_a')
865
self.addCleanup(tree.unlock)
866
state.set_parent_trees(
867
[(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
869
entry = state._get_entry(0, path_utf8='a')
870
self.build_tree(['a'])
871
# Add one where we don't provide the stat or sha already
872
self.assertEqual(('', 'a', 'a-id'), entry[0])
873
self.assertEqual(('f', '', 0, False, dirstate.DirState.NULLSTAT),
875
# Flush the buffers to disk
877
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
878
state._dirblock_state)
880
stat_value = os.lstat('a')
881
packed_stat = dirstate.pack_stat(stat_value)
882
link_or_sha1 = self.update_entry(state, entry, abspath='a',
883
stat_value=stat_value)
884
self.assertEqual(None, link_or_sha1)
886
# The dirblock entry should not have computed or cached the file's
887
# sha1, but it did update the files' st_size. However, this is not
888
# worth writing a dirstate file for, so we leave the state UNMODIFIED
889
self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
891
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
892
state._dirblock_state)
893
mode = stat_value.st_mode
894
self.assertEqual([('is_exec', mode, False)], state._log)
897
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
898
state._dirblock_state)
900
# Roll the clock back so the file is guaranteed to look too new. We
901
# should still not compute the sha1.
902
state.adjust_time(-10)
905
link_or_sha1 = self.update_entry(state, entry, abspath='a',
906
stat_value=stat_value)
907
self.assertEqual([('is_exec', mode, False)], state._log)
908
self.assertEqual(None, link_or_sha1)
909
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
910
state._dirblock_state)
911
self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
915
# If it is cachable (the clock has moved forward) but new it still
916
# won't calculate the sha or cache it.
917
state.adjust_time(+20)
919
link_or_sha1 = dirstate.update_entry(state, entry, abspath='a',
920
stat_value=stat_value)
921
self.assertEqual(None, link_or_sha1)
922
self.assertEqual([('is_exec', mode, False)], state._log)
923
self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
925
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
926
state._dirblock_state)
928
# If the file is no longer new, and the clock has been moved forward
929
# sufficiently, it will cache the sha.
931
state.set_parent_trees(
932
[(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
934
entry = state._get_entry(0, path_utf8='a')
936
link_or_sha1 = self.update_entry(state, entry, abspath='a',
937
stat_value=stat_value)
938
self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
940
self.assertEqual([('is_exec', mode, False), ('sha1', 'a')],
942
self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
945
# Subsequent calls will just return the cached value
947
link_or_sha1 = self.update_entry(state, entry, abspath='a',
948
stat_value=stat_value)
949
self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
951
self.assertEqual([], state._log)
952
self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
955
def test_update_entry_symlink(self):
956
"""Update entry should read symlinks."""
957
self.requireFeature(features.SymlinkFeature)
958
state, entry = self.get_state_with_a()
960
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
961
state._dirblock_state)
962
os.symlink('target', 'a')
964
state.adjust_time(-10) # Make the symlink look new
965
stat_value = os.lstat('a')
966
packed_stat = dirstate.pack_stat(stat_value)
967
link_or_sha1 = self.update_entry(state, entry, abspath='a',
968
stat_value=stat_value)
969
self.assertEqual('target', link_or_sha1)
970
self.assertEqual([('read_link', 'a', '')], state._log)
971
# Dirblock is not updated (the link is too new)
972
self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
974
# The file entry turned into a symlink, that is considered
975
# HASH modified worthy.
976
self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
977
state._dirblock_state)
979
# Because the stat_value looks new, we should re-read the target
981
link_or_sha1 = self.update_entry(state, entry, abspath='a',
982
stat_value=stat_value)
983
self.assertEqual('target', link_or_sha1)
984
self.assertEqual([('read_link', 'a', '')], state._log)
985
self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
988
state.adjust_time(+20) # Skip into the future, all files look old
990
link_or_sha1 = self.update_entry(state, entry, abspath='a',
991
stat_value=stat_value)
992
# The symlink stayed a symlink. So while it is new enough to cache, we
993
# don't bother setting the flag, because it is not really worth saving
994
# (when we stat the symlink, we'll have paged in the target.)
995
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
996
state._dirblock_state)
997
self.assertEqual('target', link_or_sha1)
998
# We need to re-read the link because only now can we cache it
999
self.assertEqual([('read_link', 'a', '')], state._log)
1000
self.assertEqual([('l', 'target', 6, False, packed_stat)],
1004
# Another call won't re-read the link
1005
self.assertEqual([], state._log)
1006
link_or_sha1 = self.update_entry(state, entry, abspath='a',
1007
stat_value=stat_value)
1008
self.assertEqual('target', link_or_sha1)
1009
self.assertEqual([('l', 'target', 6, False, packed_stat)],
1012
def do_update_entry(self, state, entry, abspath):
1013
stat_value = os.lstat(abspath)
1014
return self.update_entry(state, entry, abspath, stat_value)
1016
def test_update_entry_dir(self):
1017
state, entry = self.get_state_with_a()
1018
self.build_tree(['a/'])
1019
self.assertIs(None, self.do_update_entry(state, entry, 'a'))
1021
def test_update_entry_dir_unchanged(self):
1022
state, entry = self.get_state_with_a()
1023
self.build_tree(['a/'])
1024
state.adjust_time(+20)
1025
self.assertIs(None, self.do_update_entry(state, entry, 'a'))
1026
# a/ used to be a file, but is now a directory, worth saving
1027
self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
1028
state._dirblock_state)
1030
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1031
state._dirblock_state)
1032
# No changes to a/ means not worth saving.
1033
self.assertIs(None, self.do_update_entry(state, entry, 'a'))
1034
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1035
state._dirblock_state)
1036
# Change the last-modified time for the directory
1037
t = time.time() - 100.0
1039
os.utime('a', (t, t))
1041
# It looks like Win32 + FAT doesn't allow to change times on a dir.
1042
raise tests.TestSkipped("can't update mtime of a dir on FAT")
1043
saved_packed_stat = entry[1][0][-1]
1044
self.assertIs(None, self.do_update_entry(state, entry, 'a'))
1045
# We *do* go ahead and update the information in the dirblocks, but we
1046
# don't bother setting IN_MEMORY_MODIFIED because it is trivial to
1048
self.assertNotEqual(saved_packed_stat, entry[1][0][-1])
1049
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1050
state._dirblock_state)
1052
def test_update_entry_file_unchanged(self):
1053
state, _ = self.get_state_with_a()
1054
tree = self.make_branch_and_tree('tree')
1056
self.build_tree(['tree/a'])
1057
tree.add(['a'], ['a-id'])
1058
with_a_id = tree.commit('witha')
1059
self.addCleanup(tree.unlock)
1060
state.set_parent_trees(
1061
[(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
1063
entry = state._get_entry(0, path_utf8='a')
1064
self.build_tree(['a'])
1065
sha1sum = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
1066
state.adjust_time(+20)
1067
self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
1068
self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
1069
state._dirblock_state)
1071
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1072
state._dirblock_state)
1073
self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
1074
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1075
state._dirblock_state)
1077
def test_update_entry_tree_reference(self):
1078
state = test_dirstate.InstrumentedDirState.initialize('dirstate')
1079
self.addCleanup(state.unlock)
1080
state.add('r', 'r-id', 'tree-reference', None, '')
1081
self.build_tree(['r/'])
1082
entry = state._get_entry(0, path_utf8='r')
1083
self.do_update_entry(state, entry, 'r')
1084
entry = state._get_entry(0, path_utf8='r')
1085
self.assertEqual('t', entry[1][0][0])
1087
def create_and_test_file(self, state, entry):
1088
"""Create a file at 'a' and verify the state finds it during update.
1090
The state should already be versioning *something* at 'a'. This makes
1091
sure that state.update_entry recognizes it as a file.
1093
self.build_tree(['a'])
1094
stat_value = os.lstat('a')
1095
packed_stat = dirstate.pack_stat(stat_value)
1097
link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
1098
self.assertEqual(None, link_or_sha1)
1099
self.assertEqual([('f', '', 14, False, dirstate.DirState.NULLSTAT)],
1103
def create_and_test_dir(self, state, entry):
1104
"""Create a directory at 'a' and verify the state finds it.
1106
The state should already be versioning *something* at 'a'. This makes
1107
sure that state.update_entry recognizes it as a directory.
1109
self.build_tree(['a/'])
1110
stat_value = os.lstat('a')
1111
packed_stat = dirstate.pack_stat(stat_value)
1113
link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
1114
self.assertIs(None, link_or_sha1)
1115
self.assertEqual([('d', '', 0, False, packed_stat)], entry[1])
1119
# FIXME: Add unicode version
1120
def create_and_test_symlink(self, state, entry):
1121
"""Create a symlink at 'a' and verify the state finds it.
1123
The state should already be versioning *something* at 'a'. This makes
1124
sure that state.update_entry recognizes it as a symlink.
1126
This should not be called if this platform does not have symlink
1129
# caller should care about skipping test on platforms without symlinks
1130
os.symlink('path/to/foo', 'a')
1132
stat_value = os.lstat('a')
1133
packed_stat = dirstate.pack_stat(stat_value)
1135
link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
1136
self.assertEqual('path/to/foo', link_or_sha1)
1137
self.assertEqual([('l', 'path/to/foo', 11, False, packed_stat)],
1141
def test_update_file_to_dir(self):
1142
"""If a file changes to a directory we return None for the sha.
1143
We also update the inventory record.
1145
state, entry = self.get_state_with_a()
1146
# The file sha1 won't be cached unless the file is old
1147
state.adjust_time(+10)
1148
self.create_and_test_file(state, entry)
1150
self.create_and_test_dir(state, entry)
1152
def test_update_file_to_symlink(self):
1153
"""File becomes a symlink"""
1154
self.requireFeature(features.SymlinkFeature)
1155
state, entry = self.get_state_with_a()
1156
# The file sha1 won't be cached unless the file is old
1157
state.adjust_time(+10)
1158
self.create_and_test_file(state, entry)
1160
self.create_and_test_symlink(state, entry)
1162
def test_update_dir_to_file(self):
1163
"""Directory becoming a file updates the entry."""
1164
state, entry = self.get_state_with_a()
1165
# The file sha1 won't be cached unless the file is old
1166
state.adjust_time(+10)
1167
self.create_and_test_dir(state, entry)
1169
self.create_and_test_file(state, entry)
1171
def test_update_dir_to_symlink(self):
1172
"""Directory becomes a symlink"""
1173
self.requireFeature(features.SymlinkFeature)
1174
state, entry = self.get_state_with_a()
1175
# The symlink target won't be cached if it isn't old
1176
state.adjust_time(+10)
1177
self.create_and_test_dir(state, entry)
1179
self.create_and_test_symlink(state, entry)
1181
def test_update_symlink_to_file(self):
1182
"""Symlink becomes a file"""
1183
self.requireFeature(features.SymlinkFeature)
1184
state, entry = self.get_state_with_a()
1185
# The symlink and file info won't be cached unless old
1186
state.adjust_time(+10)
1187
self.create_and_test_symlink(state, entry)
1189
self.create_and_test_file(state, entry)
1191
def test_update_symlink_to_dir(self):
1192
"""Symlink becomes a directory"""
1193
self.requireFeature(features.SymlinkFeature)
1194
state, entry = self.get_state_with_a()
1195
# The symlink target won't be cached if it isn't old
1196
state.adjust_time(+10)
1197
self.create_and_test_symlink(state, entry)
1199
self.create_and_test_dir(state, entry)
1201
def test__is_executable_win32(self):
1202
state, entry = self.get_state_with_a()
1203
self.build_tree(['a'])
1205
# Make sure we are using the win32 implementation of _is_executable
1206
state._is_executable = state._is_executable_win32
1208
# The file on disk is not executable, but we are marking it as though
1209
# it is. With _is_executable_win32 we ignore what is on disk.
1210
entry[1][0] = ('f', '', 0, True, dirstate.DirState.NULLSTAT)
1212
stat_value = os.lstat('a')
1213
packed_stat = dirstate.pack_stat(stat_value)
1215
state.adjust_time(-10) # Make sure everything is new
1216
self.update_entry(state, entry, abspath='a', stat_value=stat_value)
1218
# The row is updated, but the executable bit stays set.
1219
self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
1222
# Make the disk object look old enough to cache (but it won't cache the
1223
# sha as it is a new file).
1224
state.adjust_time(+20)
1225
digest = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
1226
self.update_entry(state, entry, abspath='a', stat_value=stat_value)
1227
self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
1230
def _prepare_tree(self):
1232
text = 'Hello World\n'
1233
tree = self.make_branch_and_tree('tree')
1234
self.build_tree_contents([('tree/a file', text)])
1235
tree.add('a file', 'a-file-id')
1236
# Note: dirstate does not sha prior to the first commit
1237
# so commit now in order for the test to work
1238
tree.commit('first')
1241
def test_sha1provider_sha1_used(self):
1242
tree, text = self._prepare_tree()
1243
state = dirstate.DirState.from_tree(tree, 'dirstate',
1244
UppercaseSHA1Provider())
1245
self.addCleanup(state.unlock)
1246
expected_sha = osutils.sha_string(text.upper() + "foo")
1247
entry = state._get_entry(0, path_utf8='a file')
1248
state._sha_cutoff_time()
1249
state._cutoff_time += 10
1250
sha1 = self.update_entry(state, entry, 'tree/a file',
1251
os.lstat('tree/a file'))
1252
self.assertEqual(expected_sha, sha1)
1254
def test_sha1provider_stat_and_sha1_used(self):
1255
tree, text = self._prepare_tree()
1257
self.addCleanup(tree.unlock)
1258
state = tree._current_dirstate()
1259
state._sha1_provider = UppercaseSHA1Provider()
1260
# If we used the standard provider, it would look like nothing has
1262
file_ids_changed = [change[0] for change
1263
in tree.iter_changes(tree.basis_tree())]
1264
self.assertEqual(['a-file-id'], file_ids_changed)
1267
class UppercaseSHA1Provider(dirstate.SHA1Provider):
1268
"""A custom SHA1Provider."""
1270
def sha1(self, abspath):
1271
return self.stat_and_sha1(abspath)[1]
1273
def stat_and_sha1(self, abspath):
1274
file_obj = file(abspath, 'rb')
1276
statvalue = os.fstat(file_obj.fileno())
1277
text = ''.join(file_obj.readlines())
1278
sha1 = osutils.sha_string(text.upper() + "foo")
1281
return statvalue, sha1
1284
class TestProcessEntry(test_dirstate.TestCaseWithDirState):
1286
scenarios = multiply_scenarios(dir_reader_scenarios(), pe_scenarios)
1289
_process_entry = None
1292
super(TestProcessEntry, self).setUp()
1293
self.overrideAttr(dirstate, '_process_entry', self._process_entry)
1295
def assertChangedFileIds(self, expected, tree):
1298
file_ids = [info[0] for info
1299
in tree.iter_changes(tree.basis_tree())]
1302
self.assertEqual(sorted(expected), sorted(file_ids))
1304
def test_exceptions_raised(self):
1305
# This is a direct test of bug #495023, it relies on osutils.is_inside
1306
# getting called in an inner function. Which makes it a bit brittle,
1307
# but at least it does reproduce the bug.
1308
tree = self.make_branch_and_tree('tree')
1309
self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
1310
'tree/dir2/', 'tree/dir2/sub2'])
1311
tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
1312
tree.commit('first commit')
1314
self.addCleanup(tree.unlock)
1315
basis_tree = tree.basis_tree()
1316
def is_inside_raises(*args, **kwargs):
1317
raise RuntimeError('stop this')
1318
self.overrideAttr(osutils, 'is_inside', is_inside_raises)
1319
self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)
1321
def test_simple_changes(self):
1322
tree = self.make_branch_and_tree('tree')
1323
self.build_tree(['tree/file'])
1324
tree.add(['file'], ['file-id'])
1325
self.assertChangedFileIds([tree.get_root_id(), 'file-id'], tree)
1327
self.assertChangedFileIds([], tree)
1329
def test_sha1provider_stat_and_sha1_used(self):
1330
tree = self.make_branch_and_tree('tree')
1331
self.build_tree(['tree/file'])
1332
tree.add(['file'], ['file-id'])
1335
self.addCleanup(tree.unlock)
1336
state = tree._current_dirstate()
1337
state._sha1_provider = UppercaseSHA1Provider()
1338
self.assertChangedFileIds(['file-id'], tree)