~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__dirstate_helpers.py

  • Committer: Aaron Bentley
  • Date: 2009-04-03 20:05:25 UTC
  • mto: This revision was merged to the branch mainline in revision 4266.
  • Revision ID: aaron@aaronbentley.com-20090403200525-5vcdyhnjrlsqd6dr
Support hidden options.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the compiled dirstate helpers."""
 
18
 
 
19
import bisect
 
20
import os
 
21
import time
 
22
 
 
23
from bzrlib import (
 
24
    dirstate,
 
25
    errors,
 
26
    osutils,
 
27
    tests,
 
28
    )
 
29
from bzrlib.tests import (
 
30
        SymlinkFeature,
 
31
        )
 
32
from bzrlib.tests import test_dirstate
 
33
 
 
34
 
 
35
class _CompiledDirstateHelpersFeature(tests.Feature):
 
36
    def _probe(self):
 
37
        try:
 
38
            import bzrlib._dirstate_helpers_c
 
39
        except ImportError:
 
40
            return False
 
41
        return True
 
42
 
 
43
    def feature_name(self):
 
44
        return 'bzrlib._dirstate_helpers_c'
 
45
 
 
46
CompiledDirstateHelpersFeature = _CompiledDirstateHelpersFeature()
 
47
 
 
48
 
 
49
class TestBisectPathMixin(object):
 
50
    """Test that _bisect_path_*() returns the expected values.
 
51
 
 
52
    _bisect_path_* is intended to work like bisect.bisect_*() except it
 
53
    knows it is working on paths that are sorted by ('path', 'to', 'foo')
 
54
    chunks rather than by raw 'path/to/foo'.
 
55
 
 
56
    Test Cases should inherit from this and override ``get_bisect_path`` return
 
57
    their implementation, and ``get_bisect`` to return the matching
 
58
    bisect.bisect_* function.
 
59
    """
 
60
 
 
61
    def get_bisect_path(self):
 
62
        """Return an implementation of _bisect_path_*"""
 
63
        raise NotImplementedError
 
64
 
 
65
    def get_bisect(self):
 
66
        """Return a version of bisect.bisect_*.
 
67
 
 
68
        Also, for the 'exists' check, return the offset to the real values.
 
69
        For example bisect_left returns the index of an entry, while
 
70
        bisect_right returns the index *after* an entry
 
71
 
 
72
        :return: (bisect_func, offset)
 
73
        """
 
74
        raise NotImplementedError
 
75
 
 
76
    def assertBisect(self, paths, split_paths, path, exists=True):
 
77
        """Assert that bisect_split works like bisect_left on the split paths.
 
78
 
 
79
        :param paths: A list of path names
 
80
        :param split_paths: A list of path names that are already split up by directory
 
81
            ('path/to/foo' => ('path', 'to', 'foo'))
 
82
        :param path: The path we are indexing.
 
83
        :param exists: The path should be present, so make sure the
 
84
            final location actually points to the right value.
 
85
 
 
86
        All other arguments will be passed along.
 
87
        """
 
88
        bisect_path = self.get_bisect_path()
 
89
        self.assertIsInstance(paths, list)
 
90
        bisect_path_idx = bisect_path(paths, path)
 
91
        split_path = self.split_for_dirblocks([path])[0]
 
92
        bisect_func, offset = self.get_bisect()
 
93
        bisect_split_idx = bisect_func(split_paths, split_path)
 
94
        self.assertEqual(bisect_split_idx, bisect_path_idx,
 
95
                         '%s disagreed. %s != %s'
 
96
                         ' for key %r'
 
97
                         % (bisect_path.__name__,
 
98
                            bisect_split_idx, bisect_path_idx, path)
 
99
                         )
 
100
        if exists:
 
101
            self.assertEqual(path, paths[bisect_path_idx+offset])
 
102
 
 
103
    def split_for_dirblocks(self, paths):
 
104
        dir_split_paths = []
 
105
        for path in paths:
 
106
            dirname, basename = os.path.split(path)
 
107
            dir_split_paths.append((dirname.split('/'), basename))
 
108
        dir_split_paths.sort()
 
109
        return dir_split_paths
 
110
 
 
111
    def test_simple(self):
 
112
        """In the simple case it works just like bisect_left"""
 
113
        paths = ['', 'a', 'b', 'c', 'd']
 
114
        split_paths = self.split_for_dirblocks(paths)
 
115
        for path in paths:
 
116
            self.assertBisect(paths, split_paths, path, exists=True)
 
117
        self.assertBisect(paths, split_paths, '_', exists=False)
 
118
        self.assertBisect(paths, split_paths, 'aa', exists=False)
 
119
        self.assertBisect(paths, split_paths, 'bb', exists=False)
 
120
        self.assertBisect(paths, split_paths, 'cc', exists=False)
 
121
        self.assertBisect(paths, split_paths, 'dd', exists=False)
 
122
        self.assertBisect(paths, split_paths, 'a/a', exists=False)
 
123
        self.assertBisect(paths, split_paths, 'b/b', exists=False)
 
124
        self.assertBisect(paths, split_paths, 'c/c', exists=False)
 
125
        self.assertBisect(paths, split_paths, 'd/d', exists=False)
 
126
 
 
127
    def test_involved(self):
 
128
        """This is where bisect_path_* diverges slightly."""
 
129
        # This is the list of paths and their contents
 
130
        # a/
 
131
        #   a/
 
132
        #     a
 
133
        #     z
 
134
        #   a-a/
 
135
        #     a
 
136
        #   a-z/
 
137
        #     z
 
138
        #   a=a/
 
139
        #     a
 
140
        #   a=z/
 
141
        #     z
 
142
        #   z/
 
143
        #     a
 
144
        #     z
 
145
        #   z-a
 
146
        #   z-z
 
147
        #   z=a
 
148
        #   z=z
 
149
        # a-a/
 
150
        #   a
 
151
        # a-z/
 
152
        #   z
 
153
        # a=a/
 
154
        #   a
 
155
        # a=z/
 
156
        #   z
 
157
        # This is the exact order that is stored by dirstate
 
158
        # All children in a directory are mentioned before an children of
 
159
        # children are mentioned.
 
160
        # So all the root-directory paths, then all the
 
161
        # first sub directory, etc.
 
162
        paths = [# content of '/'
 
163
                 '', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
 
164
                 # content of 'a/'
 
165
                 'a/a', 'a/a-a', 'a/a-z',
 
166
                 'a/a=a', 'a/a=z',
 
167
                 'a/z', 'a/z-a', 'a/z-z',
 
168
                 'a/z=a', 'a/z=z',
 
169
                 # content of 'a/a/'
 
170
                 'a/a/a', 'a/a/z',
 
171
                 # content of 'a/a-a'
 
172
                 'a/a-a/a',
 
173
                 # content of 'a/a-z'
 
174
                 'a/a-z/z',
 
175
                 # content of 'a/a=a'
 
176
                 'a/a=a/a',
 
177
                 # content of 'a/a=z'
 
178
                 'a/a=z/z',
 
179
                 # content of 'a/z/'
 
180
                 'a/z/a', 'a/z/z',
 
181
                 # content of 'a-a'
 
182
                 'a-a/a',
 
183
                 # content of 'a-z'
 
184
                 'a-z/z',
 
185
                 # content of 'a=a'
 
186
                 'a=a/a',
 
187
                 # content of 'a=z'
 
188
                 'a=z/z',
 
189
                ]
 
190
        split_paths = self.split_for_dirblocks(paths)
 
191
        sorted_paths = []
 
192
        for dir_parts, basename in split_paths:
 
193
            if dir_parts == ['']:
 
194
                sorted_paths.append(basename)
 
195
            else:
 
196
                sorted_paths.append('/'.join(dir_parts + [basename]))
 
197
 
 
198
        self.assertEqual(sorted_paths, paths)
 
199
 
 
200
        for path in paths:
 
201
            self.assertBisect(paths, split_paths, path, exists=True)
 
202
 
 
203
 
 
204
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
 
205
    """Run all Bisect Path tests against _bisect_path_left_py."""
 
206
 
 
207
    def get_bisect_path(self):
 
208
        from bzrlib._dirstate_helpers_py import _bisect_path_left_py
 
209
        return _bisect_path_left_py
 
210
 
 
211
    def get_bisect(self):
 
212
        return bisect.bisect_left, 0
 
213
 
 
214
 
 
215
class TestCompiledBisectPathLeft(TestBisectPathLeft):
 
216
    """Run all Bisect Path tests against _bisect_path_right_c"""
 
217
 
 
218
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
219
 
 
220
    def get_bisect_path(self):
 
221
        from bzrlib._dirstate_helpers_c import _bisect_path_left_c
 
222
        return _bisect_path_left_c
 
223
 
 
224
 
 
225
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
 
226
    """Run all Bisect Path tests against _bisect_path_right_py"""
 
227
 
 
228
    def get_bisect_path(self):
 
229
        from bzrlib._dirstate_helpers_py import _bisect_path_right_py
 
230
        return _bisect_path_right_py
 
231
 
 
232
    def get_bisect(self):
 
233
        return bisect.bisect_right, -1
 
234
 
 
235
 
 
236
class TestCompiledBisectPathRight(TestBisectPathRight):
 
237
    """Run all Bisect Path tests against _bisect_path_right_c"""
 
238
 
 
239
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
240
 
 
241
    def get_bisect_path(self):
 
242
        from bzrlib._dirstate_helpers_c import _bisect_path_right_c
 
243
        return _bisect_path_right_c
 
244
 
 
245
 
 
246
class TestBisectDirblock(tests.TestCase):
 
247
    """Test that bisect_dirblock() returns the expected values.
 
248
 
 
249
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
250
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
251
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
252
 
 
253
    This test is parameterized by calling get_bisect_dirblock(). Child test
 
254
    cases can override this function to test against a different
 
255
    implementation.
 
256
    """
 
257
 
 
258
    def get_bisect_dirblock(self):
 
259
        """Return an implementation of bisect_dirblock"""
 
260
        from bzrlib._dirstate_helpers_py import bisect_dirblock_py
 
261
        return bisect_dirblock_py
 
262
 
 
263
    def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
 
264
        """Assert that bisect_split works like bisect_left on the split paths.
 
265
 
 
266
        :param dirblocks: A list of (path, [info]) pairs.
 
267
        :param split_dirblocks: A list of ((split, path), [info]) pairs.
 
268
        :param path: The path we are indexing.
 
269
 
 
270
        All other arguments will be passed along.
 
271
        """
 
272
        bisect_dirblock = self.get_bisect_dirblock()
 
273
        self.assertIsInstance(dirblocks, list)
 
274
        bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
 
275
        split_dirblock = (path.split('/'), [])
 
276
        bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
 
277
                                             *args)
 
278
        self.assertEqual(bisect_left_idx, bisect_split_idx,
 
279
                         'bisect_split disagreed. %s != %s'
 
280
                         ' for key %r'
 
281
                         % (bisect_left_idx, bisect_split_idx, path)
 
282
                         )
 
283
 
 
284
    def paths_to_dirblocks(self, paths):
 
285
        """Convert a list of paths into dirblock form.
 
286
 
 
287
        Also, ensure that the paths are in proper sorted order.
 
288
        """
 
289
        dirblocks = [(path, []) for path in paths]
 
290
        split_dirblocks = [(path.split('/'), []) for path in paths]
 
291
        self.assertEqual(sorted(split_dirblocks), split_dirblocks)
 
292
        return dirblocks, split_dirblocks
 
293
 
 
294
    def test_simple(self):
 
295
        """In the simple case it works just like bisect_left"""
 
296
        paths = ['', 'a', 'b', 'c', 'd']
 
297
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
298
        for path in paths:
 
299
            self.assertBisect(dirblocks, split_dirblocks, path)
 
300
        self.assertBisect(dirblocks, split_dirblocks, '_')
 
301
        self.assertBisect(dirblocks, split_dirblocks, 'aa')
 
302
        self.assertBisect(dirblocks, split_dirblocks, 'bb')
 
303
        self.assertBisect(dirblocks, split_dirblocks, 'cc')
 
304
        self.assertBisect(dirblocks, split_dirblocks, 'dd')
 
305
        self.assertBisect(dirblocks, split_dirblocks, 'a/a')
 
306
        self.assertBisect(dirblocks, split_dirblocks, 'b/b')
 
307
        self.assertBisect(dirblocks, split_dirblocks, 'c/c')
 
308
        self.assertBisect(dirblocks, split_dirblocks, 'd/d')
 
309
 
 
310
    def test_involved(self):
 
311
        """This is where bisect_left diverges slightly."""
 
312
        paths = ['', 'a',
 
313
                 'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
 
314
                 'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
 
315
                 'a-a', 'a-z',
 
316
                 'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
 
317
                 'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
 
318
                 'z-a', 'z-z',
 
319
                ]
 
320
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
321
        for path in paths:
 
322
            self.assertBisect(dirblocks, split_dirblocks, path)
 
323
 
 
324
    def test_involved_cached(self):
 
325
        """This is where bisect_left diverges slightly."""
 
326
        paths = ['', 'a',
 
327
                 'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
 
328
                 'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
 
329
                 'a-a', 'a-z',
 
330
                 'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
 
331
                 'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
 
332
                 'z-a', 'z-z',
 
333
                ]
 
334
        cache = {}
 
335
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
336
        for path in paths:
 
337
            self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
 
338
 
 
339
 
 
340
class TestCompiledBisectDirblock(TestBisectDirblock):
 
341
    """Test that bisect_dirblock() returns the expected values.
 
342
 
 
343
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
344
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
345
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
346
 
 
347
    This runs all the normal tests that TestBisectDirblock did, but uses the
 
348
    compiled version.
 
349
    """
 
350
 
 
351
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
352
 
 
353
    def get_bisect_dirblock(self):
 
354
        from bzrlib._dirstate_helpers_c import bisect_dirblock_c
 
355
        return bisect_dirblock_c
 
356
 
 
357
 
 
358
class TestCmpByDirs(tests.TestCase):
 
359
    """Test an implementation of cmp_by_dirs()
 
360
 
 
361
    cmp_by_dirs() compares 2 paths by their directory sections, rather than as
 
362
    plain strings.
 
363
 
 
364
    Child test cases can override ``get_cmp_by_dirs`` to test a specific
 
365
    implementation.
 
366
    """
 
367
 
 
368
    def get_cmp_by_dirs(self):
 
369
        """Get a specific implementation of cmp_by_dirs."""
 
370
        from bzrlib._dirstate_helpers_py import cmp_by_dirs_py
 
371
        return cmp_by_dirs_py
 
372
 
 
373
    def assertCmpByDirs(self, expected, str1, str2):
 
374
        """Compare the two strings, in both directions.
 
375
 
 
376
        :param expected: The expected comparison value. -1 means str1 comes
 
377
            first, 0 means they are equal, 1 means str2 comes first
 
378
        :param str1: string to compare
 
379
        :param str2: string to compare
 
380
        """
 
381
        cmp_by_dirs = self.get_cmp_by_dirs()
 
382
        if expected == 0:
 
383
            self.assertEqual(str1, str2)
 
384
            self.assertEqual(0, cmp_by_dirs(str1, str2))
 
385
            self.assertEqual(0, cmp_by_dirs(str2, str1))
 
386
        elif expected > 0:
 
387
            self.assertPositive(cmp_by_dirs(str1, str2))
 
388
            self.assertNegative(cmp_by_dirs(str2, str1))
 
389
        else:
 
390
            self.assertNegative(cmp_by_dirs(str1, str2))
 
391
            self.assertPositive(cmp_by_dirs(str2, str1))
 
392
 
 
393
    def test_cmp_empty(self):
 
394
        """Compare against the empty string."""
 
395
        self.assertCmpByDirs(0, '', '')
 
396
        self.assertCmpByDirs(1, 'a', '')
 
397
        self.assertCmpByDirs(1, 'ab', '')
 
398
        self.assertCmpByDirs(1, 'abc', '')
 
399
        self.assertCmpByDirs(1, 'abcd', '')
 
400
        self.assertCmpByDirs(1, 'abcde', '')
 
401
        self.assertCmpByDirs(1, 'abcdef', '')
 
402
        self.assertCmpByDirs(1, 'abcdefg', '')
 
403
        self.assertCmpByDirs(1, 'abcdefgh', '')
 
404
        self.assertCmpByDirs(1, 'abcdefghi', '')
 
405
        self.assertCmpByDirs(1, 'test/ing/a/path/', '')
 
406
 
 
407
    def test_cmp_same_str(self):
 
408
        """Compare the same string"""
 
409
        self.assertCmpByDirs(0, 'a', 'a')
 
410
        self.assertCmpByDirs(0, 'ab', 'ab')
 
411
        self.assertCmpByDirs(0, 'abc', 'abc')
 
412
        self.assertCmpByDirs(0, 'abcd', 'abcd')
 
413
        self.assertCmpByDirs(0, 'abcde', 'abcde')
 
414
        self.assertCmpByDirs(0, 'abcdef', 'abcdef')
 
415
        self.assertCmpByDirs(0, 'abcdefg', 'abcdefg')
 
416
        self.assertCmpByDirs(0, 'abcdefgh', 'abcdefgh')
 
417
        self.assertCmpByDirs(0, 'abcdefghi', 'abcdefghi')
 
418
        self.assertCmpByDirs(0, 'testing a long string', 'testing a long string')
 
419
        self.assertCmpByDirs(0, 'x'*10000, 'x'*10000)
 
420
        self.assertCmpByDirs(0, 'a/b', 'a/b')
 
421
        self.assertCmpByDirs(0, 'a/b/c', 'a/b/c')
 
422
        self.assertCmpByDirs(0, 'a/b/c/d', 'a/b/c/d')
 
423
        self.assertCmpByDirs(0, 'a/b/c/d/e', 'a/b/c/d/e')
 
424
 
 
425
    def test_simple_paths(self):
 
426
        """Compare strings that act like normal string comparison"""
 
427
        self.assertCmpByDirs(-1, 'a', 'b')
 
428
        self.assertCmpByDirs(-1, 'aa', 'ab')
 
429
        self.assertCmpByDirs(-1, 'ab', 'bb')
 
430
        self.assertCmpByDirs(-1, 'aaa', 'aab')
 
431
        self.assertCmpByDirs(-1, 'aab', 'abb')
 
432
        self.assertCmpByDirs(-1, 'abb', 'bbb')
 
433
        self.assertCmpByDirs(-1, 'aaaa', 'aaab')
 
434
        self.assertCmpByDirs(-1, 'aaab', 'aabb')
 
435
        self.assertCmpByDirs(-1, 'aabb', 'abbb')
 
436
        self.assertCmpByDirs(-1, 'abbb', 'bbbb')
 
437
        self.assertCmpByDirs(-1, 'aaaaa', 'aaaab')
 
438
        self.assertCmpByDirs(-1, 'a/a', 'a/b')
 
439
        self.assertCmpByDirs(-1, 'a/b', 'b/b')
 
440
        self.assertCmpByDirs(-1, 'a/a/a', 'a/a/b')
 
441
        self.assertCmpByDirs(-1, 'a/a/b', 'a/b/b')
 
442
        self.assertCmpByDirs(-1, 'a/b/b', 'b/b/b')
 
443
        self.assertCmpByDirs(-1, 'a/a/a/a', 'a/a/a/b')
 
444
        self.assertCmpByDirs(-1, 'a/a/a/b', 'a/a/b/b')
 
445
        self.assertCmpByDirs(-1, 'a/a/b/b', 'a/b/b/b')
 
446
        self.assertCmpByDirs(-1, 'a/b/b/b', 'b/b/b/b')
 
447
        self.assertCmpByDirs(-1, 'a/a/a/a/a', 'a/a/a/a/b')
 
448
 
 
449
    def test_tricky_paths(self):
 
450
        self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/cc/ef')
 
451
        self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/c/ef')
 
452
        self.assertCmpByDirs(-1, 'ab/cd/ef', 'ab/cd-ef')
 
453
        self.assertCmpByDirs(-1, 'ab/cd', 'ab/cd-')
 
454
        self.assertCmpByDirs(-1, 'ab/cd', 'ab-cd')
 
455
 
 
456
    def test_cmp_unicode_not_allowed(self):
 
457
        cmp_by_dirs = self.get_cmp_by_dirs()
 
458
        self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', 'str')
 
459
        self.assertRaises(TypeError, cmp_by_dirs, 'str', u'Unicode')
 
460
        self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', u'Unicode')
 
461
 
 
462
    def test_cmp_non_ascii(self):
 
463
        self.assertCmpByDirs(-1, '\xc2\xb5', '\xc3\xa5') # u'\xb5', u'\xe5'
 
464
        self.assertCmpByDirs(-1, 'a', '\xc3\xa5') # u'a', u'\xe5'
 
465
        self.assertCmpByDirs(-1, 'b', '\xc2\xb5') # u'b', u'\xb5'
 
466
        self.assertCmpByDirs(-1, 'a/b', 'a/\xc3\xa5') # u'a/b', u'a/\xe5'
 
467
        self.assertCmpByDirs(-1, 'b/a', 'b/\xc2\xb5') # u'b/a', u'b/\xb5'
 
468
 
 
469
 
 
470
class TestCompiledCmpByDirs(TestCmpByDirs):
 
471
    """Test the pyrex implementation of cmp_by_dirs"""
 
472
 
 
473
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
474
 
 
475
    def get_cmp_by_dirs(self):
 
476
        from bzrlib._dirstate_helpers_c import cmp_by_dirs_c
 
477
        return cmp_by_dirs_c
 
478
 
 
479
 
 
480
class TestCmpPathByDirblock(tests.TestCase):
 
481
    """Test an implementation of _cmp_path_by_dirblock()
 
482
 
 
483
    _cmp_path_by_dirblock() compares two paths using the sort order used by
 
484
    DirState. All paths in the same directory are sorted together.
 
485
 
 
486
    Child test cases can override ``get_cmp_path_by_dirblock`` to test a specific
 
487
    implementation.
 
488
    """
 
489
 
 
490
    def get_cmp_path_by_dirblock(self):
 
491
        """Get a specific implementation of _cmp_path_by_dirblock."""
 
492
        from bzrlib._dirstate_helpers_py import _cmp_path_by_dirblock_py
 
493
        return _cmp_path_by_dirblock_py
 
494
 
 
495
    def assertCmpPathByDirblock(self, paths):
 
496
        """Compare all paths and make sure they evaluate to the correct order.
 
497
 
 
498
        This does N^2 comparisons. It is assumed that ``paths`` is properly
 
499
        sorted list.
 
500
 
 
501
        :param paths: a sorted list of paths to compare
 
502
        """
 
503
        # First, make sure the paths being passed in are correct
 
504
        def _key(p):
 
505
            dirname, basename = os.path.split(p)
 
506
            return dirname.split('/'), basename
 
507
        self.assertEqual(sorted(paths, key=_key), paths)
 
508
 
 
509
        cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
 
510
        for idx1, path1 in enumerate(paths):
 
511
            for idx2, path2 in enumerate(paths):
 
512
                cmp_val = cmp_path_by_dirblock(path1, path2)
 
513
                if idx1 < idx2:
 
514
                    self.assertTrue(cmp_val < 0,
 
515
                        '%s did not state that %r came before %r, cmp=%s'
 
516
                        % (cmp_path_by_dirblock.__name__,
 
517
                           path1, path2, cmp_val))
 
518
                elif idx1 > idx2:
 
519
                    self.assertTrue(cmp_val > 0,
 
520
                        '%s did not state that %r came after %r, cmp=%s'
 
521
                        % (cmp_path_by_dirblock.__name__,
 
522
                           path1, path2, cmp_val))
 
523
                else: # idx1 == idx2
 
524
                    self.assertTrue(cmp_val == 0,
 
525
                        '%s did not state that %r == %r, cmp=%s'
 
526
                        % (cmp_path_by_dirblock.__name__,
 
527
                           path1, path2, cmp_val))
 
528
 
 
529
    def test_cmp_simple_paths(self):
 
530
        """Compare against the empty string."""
 
531
        self.assertCmpPathByDirblock(['', 'a', 'ab', 'abc', 'a/b/c', 'b/d/e'])
 
532
        self.assertCmpPathByDirblock(['kl', 'ab/cd', 'ab/ef', 'gh/ij'])
 
533
 
 
534
    def test_tricky_paths(self):
 
535
        self.assertCmpPathByDirblock([
 
536
            # Contents of ''
 
537
            '', 'a', 'a-a', 'a=a', 'b',
 
538
            # Contents of 'a'
 
539
            'a/a', 'a/a-a', 'a/a=a', 'a/b',
 
540
            # Contents of 'a/a'
 
541
            'a/a/a', 'a/a/a-a', 'a/a/a=a',
 
542
            # Contents of 'a/a/a'
 
543
            'a/a/a/a', 'a/a/a/b',
 
544
            # Contents of 'a/a/a-a',
 
545
            'a/a/a-a/a', 'a/a/a-a/b',
 
546
            # Contents of 'a/a/a=a',
 
547
            'a/a/a=a/a', 'a/a/a=a/b',
 
548
            # Contents of 'a/a-a'
 
549
            'a/a-a/a',
 
550
            # Contents of 'a/a-a/a'
 
551
            'a/a-a/a/a', 'a/a-a/a/b',
 
552
            # Contents of 'a/a=a'
 
553
            'a/a=a/a',
 
554
            # Contents of 'a/b'
 
555
            'a/b/a', 'a/b/b',
 
556
            # Contents of 'a-a',
 
557
            'a-a/a', 'a-a/b',
 
558
            # Contents of 'a=a',
 
559
            'a=a/a', 'a=a/b',
 
560
            # Contents of 'b',
 
561
            'b/a', 'b/b',
 
562
            ])
 
563
        self.assertCmpPathByDirblock([
 
564
                 # content of '/'
 
565
                 '', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
 
566
                 # content of 'a/'
 
567
                 'a/a', 'a/a-a', 'a/a-z',
 
568
                 'a/a=a', 'a/a=z',
 
569
                 'a/z', 'a/z-a', 'a/z-z',
 
570
                 'a/z=a', 'a/z=z',
 
571
                 # content of 'a/a/'
 
572
                 'a/a/a', 'a/a/z',
 
573
                 # content of 'a/a-a'
 
574
                 'a/a-a/a',
 
575
                 # content of 'a/a-z'
 
576
                 'a/a-z/z',
 
577
                 # content of 'a/a=a'
 
578
                 'a/a=a/a',
 
579
                 # content of 'a/a=z'
 
580
                 'a/a=z/z',
 
581
                 # content of 'a/z/'
 
582
                 'a/z/a', 'a/z/z',
 
583
                 # content of 'a-a'
 
584
                 'a-a/a',
 
585
                 # content of 'a-z'
 
586
                 'a-z/z',
 
587
                 # content of 'a=a'
 
588
                 'a=a/a',
 
589
                 # content of 'a=z'
 
590
                 'a=z/z',
 
591
                ])
 
592
 
 
593
    def test_unicode_not_allowed(self):
 
594
        cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
 
595
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', 'str')
 
596
        self.assertRaises(TypeError, cmp_path_by_dirblock, 'str', u'Uni')
 
597
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', u'Uni')
 
598
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', 'x/str')
 
599
        self.assertRaises(TypeError, cmp_path_by_dirblock, 'x/str', u'x/Uni')
 
600
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', u'x/Uni')
 
601
 
 
602
    def test_nonascii(self):
 
603
        self.assertCmpPathByDirblock([
 
604
            # content of '/'
 
605
            '', 'a', '\xc2\xb5', '\xc3\xa5',
 
606
            # content of 'a'
 
607
            'a/a', 'a/\xc2\xb5', 'a/\xc3\xa5',
 
608
            # content of 'a/a'
 
609
            'a/a/a', 'a/a/\xc2\xb5', 'a/a/\xc3\xa5',
 
610
            # content of 'a/\xc2\xb5'
 
611
            'a/\xc2\xb5/a', 'a/\xc2\xb5/\xc2\xb5', 'a/\xc2\xb5/\xc3\xa5',
 
612
            # content of 'a/\xc3\xa5'
 
613
            'a/\xc3\xa5/a', 'a/\xc3\xa5/\xc2\xb5', 'a/\xc3\xa5/\xc3\xa5',
 
614
            # content of '\xc2\xb5'
 
615
            '\xc2\xb5/a', '\xc2\xb5/\xc2\xb5', '\xc2\xb5/\xc3\xa5',
 
616
            # content of '\xc2\xe5'
 
617
            '\xc3\xa5/a', '\xc3\xa5/\xc2\xb5', '\xc3\xa5/\xc3\xa5',
 
618
            ])
 
619
 
 
620
 
 
621
class TestCompiledCmpPathByDirblock(TestCmpPathByDirblock):
 
622
    """Test the pyrex implementation of _cmp_path_by_dirblock"""
 
623
 
 
624
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
625
 
 
626
    def get_cmp_by_dirs(self):
 
627
        from bzrlib._dirstate_helpers_c import _cmp_path_by_dirblock_c
 
628
        return _cmp_path_by_dirblock_c
 
629
 
 
630
 
 
631
class TestMemRChr(tests.TestCase):
 
632
    """Test memrchr functionality"""
 
633
 
 
634
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
635
 
 
636
    def assertMemRChr(self, expected, s, c):
 
637
        from bzrlib._dirstate_helpers_c import _py_memrchr
 
638
        self.assertEqual(expected, _py_memrchr(s, c))
 
639
 
 
640
    def test_missing(self):
 
641
        self.assertMemRChr(None, '', 'a')
 
642
        self.assertMemRChr(None, '', 'c')
 
643
        self.assertMemRChr(None, 'abcdefghijklm', 'q')
 
644
        self.assertMemRChr(None, 'aaaaaaaaaaaaaaaaaaaaaaa', 'b')
 
645
 
 
646
    def test_single_entry(self):
 
647
        self.assertMemRChr(0, 'abcdefghijklm', 'a')
 
648
        self.assertMemRChr(1, 'abcdefghijklm', 'b')
 
649
        self.assertMemRChr(2, 'abcdefghijklm', 'c')
 
650
        self.assertMemRChr(10, 'abcdefghijklm', 'k')
 
651
        self.assertMemRChr(11, 'abcdefghijklm', 'l')
 
652
        self.assertMemRChr(12, 'abcdefghijklm', 'm')
 
653
 
 
654
    def test_multiple(self):
 
655
        self.assertMemRChr(10, 'abcdefjklmabcdefghijklm', 'a')
 
656
        self.assertMemRChr(11, 'abcdefjklmabcdefghijklm', 'b')
 
657
        self.assertMemRChr(12, 'abcdefjklmabcdefghijklm', 'c')
 
658
        self.assertMemRChr(20, 'abcdefjklmabcdefghijklm', 'k')
 
659
        self.assertMemRChr(21, 'abcdefjklmabcdefghijklm', 'l')
 
660
        self.assertMemRChr(22, 'abcdefjklmabcdefghijklm', 'm')
 
661
        self.assertMemRChr(22, 'aaaaaaaaaaaaaaaaaaaaaaa', 'a')
 
662
 
 
663
    def test_with_nulls(self):
 
664
        self.assertMemRChr(10, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'a')
 
665
        self.assertMemRChr(11, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'b')
 
666
        self.assertMemRChr(12, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'c')
 
667
        self.assertMemRChr(20, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'k')
 
668
        self.assertMemRChr(21, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'l')
 
669
        self.assertMemRChr(22, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'm')
 
670
        self.assertMemRChr(22, 'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', 'a')
 
671
        self.assertMemRChr(9, '\0\0\0\0\0\0\0\0\0\0', '\0')
 
672
 
 
673
 
 
674
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
 
675
    """Test an implementation of _read_dirblocks()
 
676
 
 
677
    _read_dirblocks() reads in all of the dirblock information from the disk
 
678
    file.
 
679
 
 
680
    Child test cases can override ``get_read_dirblocks`` to test a specific
 
681
    implementation.
 
682
    """
 
683
 
 
684
    def get_read_dirblocks(self):
 
685
        from bzrlib._dirstate_helpers_py import _read_dirblocks_py
 
686
        return _read_dirblocks_py
 
687
 
 
688
    def test_smoketest(self):
 
689
        """Make sure that we can create and read back a simple file."""
 
690
        tree, state, expected = self.create_basic_dirstate()
 
691
        del tree
 
692
        state._read_header_if_needed()
 
693
        self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
 
694
                         state._dirblock_state)
 
695
        read_dirblocks = self.get_read_dirblocks()
 
696
        read_dirblocks(state)
 
697
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
698
                         state._dirblock_state)
 
699
 
 
700
    def test_trailing_garbage(self):
 
701
        tree, state, expected = self.create_basic_dirstate()
 
702
        # We can modify the file as long as it hasn't been read yet.
 
703
        f = open('dirstate', 'ab')
 
704
        try:
 
705
            # Add bogus trailing garbage
 
706
            f.write('bogus\n')
 
707
        finally:
 
708
            f.close()
 
709
        e = self.assertRaises(errors.DirstateCorrupt,
 
710
                              state._read_dirblocks_if_needed)
 
711
        # Make sure we mention the bogus characters in the error
 
712
        self.assertContainsRe(str(e), 'bogus')
 
713
 
 
714
 
 
715
class TestCompiledReadDirblocks(TestReadDirblocks):
 
716
    """Test the pyrex implementation of _read_dirblocks"""
 
717
 
 
718
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
719
 
 
720
    def get_read_dirblocks(self):
 
721
        from bzrlib._dirstate_helpers_c import _read_dirblocks_c
 
722
        return _read_dirblocks_c
 
723
 
 
724
 
 
725
class TestUsingCompiledIfAvailable(tests.TestCase):
 
726
    """Check that any compiled functions that are available are the default.
 
727
 
 
728
    It is possible to have typos, etc in the import line, such that
 
729
    _dirstate_helpers_c is actually available, but the compiled functions are
 
730
    not being used.
 
731
    """
 
732
 
 
733
    def test_bisect_dirblock(self):
 
734
        if CompiledDirstateHelpersFeature.available():
 
735
            from bzrlib._dirstate_helpers_c import bisect_dirblock_c
 
736
            self.assertIs(bisect_dirblock_c, dirstate.bisect_dirblock)
 
737
        else:
 
738
            from bzrlib._dirstate_helpers_py import bisect_dirblock_py
 
739
            self.assertIs(bisect_dirblock_py, dirstate.bisect_dirblock)
 
740
 
 
741
    def test__bisect_path_left(self):
 
742
        if CompiledDirstateHelpersFeature.available():
 
743
            from bzrlib._dirstate_helpers_c import _bisect_path_left_c
 
744
            self.assertIs(_bisect_path_left_c, dirstate._bisect_path_left)
 
745
        else:
 
746
            from bzrlib._dirstate_helpers_py import _bisect_path_left_py
 
747
            self.assertIs(_bisect_path_left_py, dirstate._bisect_path_left)
 
748
 
 
749
    def test__bisect_path_right(self):
 
750
        if CompiledDirstateHelpersFeature.available():
 
751
            from bzrlib._dirstate_helpers_c import _bisect_path_right_c
 
752
            self.assertIs(_bisect_path_right_c, dirstate._bisect_path_right)
 
753
        else:
 
754
            from bzrlib._dirstate_helpers_py import _bisect_path_right_py
 
755
            self.assertIs(_bisect_path_right_py, dirstate._bisect_path_right)
 
756
 
 
757
    def test_cmp_by_dirs(self):
 
758
        if CompiledDirstateHelpersFeature.available():
 
759
            from bzrlib._dirstate_helpers_c import cmp_by_dirs_c
 
760
            self.assertIs(cmp_by_dirs_c, dirstate.cmp_by_dirs)
 
761
        else:
 
762
            from bzrlib._dirstate_helpers_py import cmp_by_dirs_py
 
763
            self.assertIs(cmp_by_dirs_py, dirstate.cmp_by_dirs)
 
764
 
 
765
    def test__read_dirblocks(self):
 
766
        if CompiledDirstateHelpersFeature.available():
 
767
            from bzrlib._dirstate_helpers_c import _read_dirblocks_c
 
768
            self.assertIs(_read_dirblocks_c, dirstate._read_dirblocks)
 
769
        else:
 
770
            from bzrlib._dirstate_helpers_py import _read_dirblocks_py
 
771
            self.assertIs(_read_dirblocks_py, dirstate._read_dirblocks)
 
772
 
 
773
    def test_update_entry(self):
 
774
        if CompiledDirstateHelpersFeature.available():
 
775
            from bzrlib._dirstate_helpers_c import update_entry
 
776
            self.assertIs(update_entry, dirstate.update_entry)
 
777
        else:
 
778
            from bzrlib.dirstate import py_update_entry
 
779
            self.assertIs(py_update_entry, dirstate.py_update_entry)
 
780
 
 
781
    def test_process_entry(self):
 
782
        if CompiledDirstateHelpersFeature.available():
 
783
            from bzrlib._dirstate_helpers_c import ProcessEntryC
 
784
            self.assertIs(ProcessEntryC, dirstate._process_entry)
 
785
        else:
 
786
            from bzrlib.dirstate import ProcessEntryPython
 
787
            self.assertIs(ProcessEntryPython, dirstate._process_entry)
 
788
 
 
789
 
 
790
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
 
791
    """Test the DirState.update_entry functions"""
 
792
 
 
793
    def get_state_with_a(self):
 
794
        """Create a DirState tracking a single object named 'a'"""
 
795
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
796
        self.addCleanup(state.unlock)
 
797
        state.add('a', 'a-id', 'file', None, '')
 
798
        entry = state._get_entry(0, path_utf8='a')
 
799
        self.set_update_entry()
 
800
        return state, entry
 
801
 
 
802
    def set_update_entry(self):
 
803
        self.update_entry = dirstate.py_update_entry
 
804
 
 
805
    def test_observed_sha1_cachable(self):
 
806
        state, entry = self.get_state_with_a()
 
807
        atime = time.time() - 10
 
808
        self.build_tree(['a'])
 
809
        statvalue = os.lstat('a')
 
810
        statvalue = test_dirstate._FakeStat(statvalue.st_size, atime, atime,
 
811
            statvalue.st_dev, statvalue.st_ino, statvalue.st_mode)
 
812
        state._observed_sha1(entry, "foo", statvalue)
 
813
        self.assertEqual('foo', entry[1][0][1])
 
814
        packed_stat = dirstate.pack_stat(statvalue)
 
815
        self.assertEqual(packed_stat, entry[1][0][4])
 
816
 
 
817
    def test_observed_sha1_not_cachable(self):
 
818
        state, entry = self.get_state_with_a()
 
819
        oldval = entry[1][0][1]
 
820
        oldstat = entry[1][0][4]
 
821
        self.build_tree(['a'])
 
822
        statvalue = os.lstat('a')
 
823
        state._observed_sha1(entry, "foo", statvalue)
 
824
        self.assertEqual(oldval, entry[1][0][1])
 
825
        self.assertEqual(oldstat, entry[1][0][4])
 
826
 
 
827
    def test_update_entry(self):
 
828
        state, _ = self.get_state_with_a()
 
829
        tree = self.make_branch_and_tree('tree')
 
830
        tree.lock_write()
 
831
        empty_revid = tree.commit('empty')
 
832
        self.build_tree(['tree/a'])
 
833
        tree.add(['a'], ['a-id'])
 
834
        with_a_id = tree.commit('with_a')
 
835
        self.addCleanup(tree.unlock)
 
836
        state.set_parent_trees(
 
837
            [(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
 
838
            [])
 
839
        entry = state._get_entry(0, path_utf8='a')
 
840
        self.build_tree(['a'])
 
841
        # Add one where we don't provide the stat or sha already
 
842
        self.assertEqual(('', 'a', 'a-id'), entry[0])
 
843
        self.assertEqual(('f', '', 0, False, dirstate.DirState.NULLSTAT),
 
844
                         entry[1][0])
 
845
        # Flush the buffers to disk
 
846
        state.save()
 
847
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
848
                         state._dirblock_state)
 
849
 
 
850
        stat_value = os.lstat('a')
 
851
        packed_stat = dirstate.pack_stat(stat_value)
 
852
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
853
                                          stat_value=stat_value)
 
854
        self.assertEqual(None, link_or_sha1)
 
855
 
 
856
        # The dirblock entry should not have cached the file's sha1 (too new)
 
857
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
 
858
                         entry[1][0])
 
859
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
860
                         state._dirblock_state)
 
861
        mode = stat_value.st_mode
 
862
        self.assertEqual([('is_exec', mode, False)], state._log)
 
863
 
 
864
        state.save()
 
865
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
866
                         state._dirblock_state)
 
867
 
 
868
        # If we do it again right away, we don't know if the file has changed
 
869
        # so we will re-read the file. Roll the clock back so the file is
 
870
        # guaranteed to look too new.
 
871
        state.adjust_time(-10)
 
872
        del state._log[:]
 
873
 
 
874
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
875
                                          stat_value=stat_value)
 
876
        self.assertEqual([('is_exec', mode, False)], state._log)
 
877
        self.assertEqual(None, link_or_sha1)
 
878
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
879
                         state._dirblock_state)
 
880
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
 
881
                         entry[1][0])
 
882
        state.save()
 
883
 
 
884
        # If it is cachable (the clock has moved forward) but new it still
 
885
        # won't calculate the sha or cache it.
 
886
        state.adjust_time(+20)
 
887
        del state._log[:]
 
888
        link_or_sha1 = dirstate.update_entry(state, entry, abspath='a',
 
889
                                          stat_value=stat_value)
 
890
        self.assertEqual(None, link_or_sha1)
 
891
        self.assertEqual([('is_exec', mode, False)], state._log)
 
892
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
 
893
                         entry[1][0])
 
894
 
 
895
        # If the file is no longer new, and the clock has been moved forward
 
896
        # sufficiently, it will cache the sha.
 
897
        del state._log[:]
 
898
        state.set_parent_trees(
 
899
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
900
            [])
 
901
        entry = state._get_entry(0, path_utf8='a')
 
902
 
 
903
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
904
                                          stat_value=stat_value)
 
905
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
906
                         link_or_sha1)
 
907
        self.assertEqual([('is_exec', mode, False), ('sha1', 'a')],
 
908
                          state._log)
 
909
        self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
 
910
                         entry[1][0])
 
911
 
 
912
        # Subsequent calls will just return the cached value
 
913
        del state._log[:]
 
914
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
915
                                          stat_value=stat_value)
 
916
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
917
                         link_or_sha1)
 
918
        self.assertEqual([], state._log)
 
919
        self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
 
920
                         entry[1][0])
 
921
 
 
922
    def test_update_entry_symlink(self):
 
923
        """Update entry should read symlinks."""
 
924
        self.requireFeature(SymlinkFeature)
 
925
        state, entry = self.get_state_with_a()
 
926
        state.save()
 
927
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
928
                         state._dirblock_state)
 
929
        os.symlink('target', 'a')
 
930
 
 
931
        state.adjust_time(-10) # Make the symlink look new
 
932
        stat_value = os.lstat('a')
 
933
        packed_stat = dirstate.pack_stat(stat_value)
 
934
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
935
                                          stat_value=stat_value)
 
936
        self.assertEqual('target', link_or_sha1)
 
937
        self.assertEqual([('read_link', 'a', '')], state._log)
 
938
        # Dirblock is not updated (the link is too new)
 
939
        self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
 
940
                         entry[1])
 
941
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
942
                         state._dirblock_state)
 
943
 
 
944
        # Because the stat_value looks new, we should re-read the target
 
945
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
946
                                          stat_value=stat_value)
 
947
        self.assertEqual('target', link_or_sha1)
 
948
        self.assertEqual([('read_link', 'a', ''),
 
949
                          ('read_link', 'a', ''),
 
950
                         ], state._log)
 
951
        self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
 
952
                         entry[1])
 
953
        state.adjust_time(+20) # Skip into the future, all files look old
 
954
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
955
                                          stat_value=stat_value)
 
956
        self.assertEqual('target', link_or_sha1)
 
957
        # We need to re-read the link because only now can we cache it
 
958
        self.assertEqual([('read_link', 'a', ''),
 
959
                          ('read_link', 'a', ''),
 
960
                          ('read_link', 'a', ''),
 
961
                         ], state._log)
 
962
        self.assertEqual([('l', 'target', 6, False, packed_stat)],
 
963
                         entry[1])
 
964
 
 
965
        # Another call won't re-read the link
 
966
        self.assertEqual([('read_link', 'a', ''),
 
967
                          ('read_link', 'a', ''),
 
968
                          ('read_link', 'a', ''),
 
969
                         ], state._log)
 
970
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
971
                                          stat_value=stat_value)
 
972
        self.assertEqual('target', link_or_sha1)
 
973
        self.assertEqual([('l', 'target', 6, False, packed_stat)],
 
974
                         entry[1])
 
975
 
 
976
    def do_update_entry(self, state, entry, abspath):
 
977
        stat_value = os.lstat(abspath)
 
978
        return self.update_entry(state, entry, abspath, stat_value)
 
979
 
 
980
    def test_update_entry_dir(self):
 
981
        state, entry = self.get_state_with_a()
 
982
        self.build_tree(['a/'])
 
983
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
984
 
 
985
    def test_update_entry_dir_unchanged(self):
 
986
        state, entry = self.get_state_with_a()
 
987
        self.build_tree(['a/'])
 
988
        state.adjust_time(+20)
 
989
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
990
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
991
                         state._dirblock_state)
 
992
        state.save()
 
993
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
994
                         state._dirblock_state)
 
995
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
996
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
997
                         state._dirblock_state)
 
998
 
 
999
    def test_update_entry_file_unchanged(self):
 
1000
        state, _ = self.get_state_with_a()
 
1001
        tree = self.make_branch_and_tree('tree')
 
1002
        tree.lock_write()
 
1003
        self.build_tree(['tree/a'])
 
1004
        tree.add(['a'], ['a-id'])
 
1005
        with_a_id = tree.commit('witha')
 
1006
        self.addCleanup(tree.unlock)
 
1007
        state.set_parent_trees(
 
1008
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
1009
            [])
 
1010
        entry = state._get_entry(0, path_utf8='a')
 
1011
        self.build_tree(['a'])
 
1012
        sha1sum = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1013
        state.adjust_time(+20)
 
1014
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
 
1015
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1016
                         state._dirblock_state)
 
1017
        state.save()
 
1018
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1019
                         state._dirblock_state)
 
1020
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
 
1021
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1022
                         state._dirblock_state)
 
1023
 
 
1024
    def test_update_entry_tree_reference(self):
 
1025
        self.set_update_entry()
 
1026
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
1027
        self.addCleanup(state.unlock)
 
1028
        state.add('r', 'r-id', 'tree-reference', None, '')
 
1029
        self.build_tree(['r/'])
 
1030
        entry = state._get_entry(0, path_utf8='r')
 
1031
        self.do_update_entry(state, entry, 'r')
 
1032
        entry = state._get_entry(0, path_utf8='r')
 
1033
        self.assertEqual('t', entry[1][0][0])
 
1034
 
 
1035
    def create_and_test_file(self, state, entry):
 
1036
        """Create a file at 'a' and verify the state finds it during update.
 
1037
 
 
1038
        The state should already be versioning *something* at 'a'. This makes
 
1039
        sure that state.update_entry recognizes it as a file.
 
1040
        """
 
1041
        self.build_tree(['a'])
 
1042
        stat_value = os.lstat('a')
 
1043
        packed_stat = dirstate.pack_stat(stat_value)
 
1044
 
 
1045
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1046
        self.assertEqual(None, link_or_sha1)
 
1047
        self.assertEqual([('f', '', 14, False, dirstate.DirState.NULLSTAT)],
 
1048
                         entry[1])
 
1049
        return packed_stat
 
1050
 
 
1051
    def create_and_test_dir(self, state, entry):
 
1052
        """Create a directory at 'a' and verify the state finds it.
 
1053
 
 
1054
        The state should already be versioning *something* at 'a'. This makes
 
1055
        sure that state.update_entry recognizes it as a directory.
 
1056
        """
 
1057
        self.build_tree(['a/'])
 
1058
        stat_value = os.lstat('a')
 
1059
        packed_stat = dirstate.pack_stat(stat_value)
 
1060
 
 
1061
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1062
        self.assertIs(None, link_or_sha1)
 
1063
        self.assertEqual([('d', '', 0, False, packed_stat)], entry[1])
 
1064
 
 
1065
        return packed_stat
 
1066
 
 
1067
    def create_and_test_symlink(self, state, entry):
 
1068
        """Create a symlink at 'a' and verify the state finds it.
 
1069
 
 
1070
        The state should already be versioning *something* at 'a'. This makes
 
1071
        sure that state.update_entry recognizes it as a symlink.
 
1072
 
 
1073
        This should not be called if this platform does not have symlink
 
1074
        support.
 
1075
        """
 
1076
        # caller should care about skipping test on platforms without symlinks
 
1077
        os.symlink('path/to/foo', 'a')
 
1078
 
 
1079
        stat_value = os.lstat('a')
 
1080
        packed_stat = dirstate.pack_stat(stat_value)
 
1081
 
 
1082
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1083
        self.assertEqual('path/to/foo', link_or_sha1)
 
1084
        self.assertEqual([('l', 'path/to/foo', 11, False, packed_stat)],
 
1085
                         entry[1])
 
1086
        return packed_stat
 
1087
 
 
1088
    def test_update_file_to_dir(self):
 
1089
        """If a file changes to a directory we return None for the sha.
 
1090
        We also update the inventory record.
 
1091
        """
 
1092
        state, entry = self.get_state_with_a()
 
1093
        # The file sha1 won't be cached unless the file is old
 
1094
        state.adjust_time(+10)
 
1095
        self.create_and_test_file(state, entry)
 
1096
        os.remove('a')
 
1097
        self.create_and_test_dir(state, entry)
 
1098
 
 
1099
    def test_update_file_to_symlink(self):
 
1100
        """File becomes a symlink"""
 
1101
        self.requireFeature(SymlinkFeature)
 
1102
        state, entry = self.get_state_with_a()
 
1103
        # The file sha1 won't be cached unless the file is old
 
1104
        state.adjust_time(+10)
 
1105
        self.create_and_test_file(state, entry)
 
1106
        os.remove('a')
 
1107
        self.create_and_test_symlink(state, entry)
 
1108
 
 
1109
    def test_update_dir_to_file(self):
 
1110
        """Directory becoming a file updates the entry."""
 
1111
        state, entry = self.get_state_with_a()
 
1112
        # The file sha1 won't be cached unless the file is old
 
1113
        state.adjust_time(+10)
 
1114
        self.create_and_test_dir(state, entry)
 
1115
        os.rmdir('a')
 
1116
        self.create_and_test_file(state, entry)
 
1117
 
 
1118
    def test_update_dir_to_symlink(self):
 
1119
        """Directory becomes a symlink"""
 
1120
        self.requireFeature(SymlinkFeature)
 
1121
        state, entry = self.get_state_with_a()
 
1122
        # The symlink target won't be cached if it isn't old
 
1123
        state.adjust_time(+10)
 
1124
        self.create_and_test_dir(state, entry)
 
1125
        os.rmdir('a')
 
1126
        self.create_and_test_symlink(state, entry)
 
1127
 
 
1128
    def test_update_symlink_to_file(self):
 
1129
        """Symlink becomes a file"""
 
1130
        self.requireFeature(SymlinkFeature)
 
1131
        state, entry = self.get_state_with_a()
 
1132
        # The symlink and file info won't be cached unless old
 
1133
        state.adjust_time(+10)
 
1134
        self.create_and_test_symlink(state, entry)
 
1135
        os.remove('a')
 
1136
        self.create_and_test_file(state, entry)
 
1137
 
 
1138
    def test_update_symlink_to_dir(self):
 
1139
        """Symlink becomes a directory"""
 
1140
        self.requireFeature(SymlinkFeature)
 
1141
        state, entry = self.get_state_with_a()
 
1142
        # The symlink target won't be cached if it isn't old
 
1143
        state.adjust_time(+10)
 
1144
        self.create_and_test_symlink(state, entry)
 
1145
        os.remove('a')
 
1146
        self.create_and_test_dir(state, entry)
 
1147
 
 
1148
    def test__is_executable_win32(self):
 
1149
        state, entry = self.get_state_with_a()
 
1150
        self.build_tree(['a'])
 
1151
 
 
1152
        # Make sure we are using the win32 implementation of _is_executable
 
1153
        state._is_executable = state._is_executable_win32
 
1154
 
 
1155
        # The file on disk is not executable, but we are marking it as though
 
1156
        # it is. With _is_executable_win32 we ignore what is on disk.
 
1157
        entry[1][0] = ('f', '', 0, True, dirstate.DirState.NULLSTAT)
 
1158
 
 
1159
        stat_value = os.lstat('a')
 
1160
        packed_stat = dirstate.pack_stat(stat_value)
 
1161
 
 
1162
        state.adjust_time(-10) # Make sure everything is new
 
1163
        self.update_entry(state, entry, abspath='a', stat_value=stat_value)
 
1164
 
 
1165
        # The row is updated, but the executable bit stays set.
 
1166
        self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
 
1167
                         entry[1])
 
1168
 
 
1169
        # Make the disk object look old enough to cache (but it won't cache the sha
 
1170
        # as it is a new file).
 
1171
        state.adjust_time(+20)
 
1172
        digest = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1173
        self.update_entry(state, entry, abspath='a', stat_value=stat_value)
 
1174
        self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
 
1175
            entry[1])
 
1176
 
 
1177
    def _prepare_tree(self):
 
1178
        # Create a tree
 
1179
        text = 'Hello World\n'
 
1180
        tree = self.make_branch_and_tree('tree')
 
1181
        self.build_tree_contents([('tree/a file', text)])
 
1182
        tree.add('a file', 'a-file-id')
 
1183
        # Note: dirstate does not sha prior to the first commit
 
1184
        # so commit now in order for the test to work
 
1185
        tree.commit('first')
 
1186
        return tree, text
 
1187
 
 
1188
    def test_sha1provider_sha1_used(self):
 
1189
        tree, text = self._prepare_tree()
 
1190
        state = dirstate.DirState.from_tree(tree, 'dirstate',
 
1191
            UppercaseSHA1Provider())
 
1192
        self.addCleanup(state.unlock)
 
1193
        expected_sha = osutils.sha_string(text.upper() + "foo")
 
1194
        entry = state._get_entry(0, path_utf8='a file')
 
1195
        state._sha_cutoff_time()
 
1196
        state._cutoff_time += 10
 
1197
        sha1 = dirstate.update_entry(state, entry, 'tree/a file',
 
1198
            os.lstat('tree/a file'))
 
1199
        self.assertEqual(expected_sha, sha1)
 
1200
 
 
1201
    def test_sha1provider_stat_and_sha1_used(self):
 
1202
        tree, text = self._prepare_tree()
 
1203
        tree.lock_write()
 
1204
        self.addCleanup(tree.unlock)
 
1205
        state = tree._current_dirstate()
 
1206
        state._sha1_provider = UppercaseSHA1Provider()
 
1207
        # If we used the standard provider, it would look like nothing has
 
1208
        # changed
 
1209
        file_ids_changed = [change[0] for change 
 
1210
                in tree.iter_changes(tree.basis_tree())]
 
1211
        self.assertEqual(['a-file-id'], file_ids_changed)
 
1212
 
 
1213
 
 
1214
class UppercaseSHA1Provider(dirstate.SHA1Provider):
 
1215
    """A custom SHA1Provider."""
 
1216
 
 
1217
    def sha1(self, abspath):
 
1218
        return self.stat_and_sha1(abspath)[1]
 
1219
 
 
1220
    def stat_and_sha1(self, abspath):
 
1221
        file_obj = file(abspath, 'rb')
 
1222
        try:
 
1223
            statvalue = os.fstat(file_obj.fileno())
 
1224
            text = ''.join(file_obj.readlines())
 
1225
            sha1 = osutils.sha_string(text.upper() + "foo")
 
1226
        finally:
 
1227
            file_obj.close()
 
1228
        return statvalue, sha1
 
1229
 
 
1230
 
 
1231
class TestCompiledUpdateEntry(TestUpdateEntry):
 
1232
    """Test the pyrex implementation of _read_dirblocks"""
 
1233
 
 
1234
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
1235
 
 
1236
    def set_update_entry(self):
 
1237
        from bzrlib._dirstate_helpers_c import update_entry
 
1238
        self.update_entry = update_entry
 
1239
 
 
1240
 
 
1241
class TestProcessEntryPython(test_dirstate.TestCaseWithDirState):
 
1242
 
 
1243
    def setUp(self):
 
1244
        super(TestProcessEntryPython, self).setUp()
 
1245
        self.setup_process_entry()
 
1246
 
 
1247
    def setup_process_entry(self):
 
1248
        from bzrlib import dirstate
 
1249
        orig = dirstate._process_entry
 
1250
        def cleanup():
 
1251
            dirstate._process_entry = orig
 
1252
        self.addCleanup(cleanup)
 
1253
        dirstate._process_entry = dirstate.ProcessEntryPython
 
1254
 
 
1255
    def assertChangedFileIds(self, expected, tree):
 
1256
        tree.lock_read()
 
1257
        try:
 
1258
            file_ids = [info[0] for info
 
1259
                        in tree.iter_changes(tree.basis_tree())]
 
1260
        finally:
 
1261
            tree.unlock()
 
1262
        self.assertEqual(sorted(expected), sorted(file_ids))
 
1263
 
 
1264
    def test_simple_changes(self):
 
1265
        tree = self.make_branch_and_tree('tree')
 
1266
        self.build_tree(['tree/file'])
 
1267
        tree.add(['file'], ['file-id'])
 
1268
        self.assertChangedFileIds([tree.get_root_id(), 'file-id'], tree)
 
1269
        tree.commit('one')
 
1270
        self.assertChangedFileIds([], tree)
 
1271
 
 
1272
    def test_sha1provider_stat_and_sha1_used(self):
 
1273
        tree = self.make_branch_and_tree('tree')
 
1274
        self.build_tree(['tree/file'])
 
1275
        tree.add(['file'], ['file-id'])
 
1276
        tree.commit('one')
 
1277
        tree.lock_write()
 
1278
        self.addCleanup(tree.unlock)
 
1279
        state = tree._current_dirstate()
 
1280
        state._sha1_provider = UppercaseSHA1Provider()
 
1281
        self.assertChangedFileIds(['file-id'], tree)
 
1282
 
 
1283
 
 
1284
class TestProcessEntryC(TestProcessEntryPython):
 
1285
    _test_needs_features = [CompiledDirstateHelpersFeature]
 
1286
 
 
1287
    def setup_process_entry(self):
 
1288
        from bzrlib import _dirstate_helpers_c
 
1289
        orig = dirstate._process_entry
 
1290
        def cleanup():
 
1291
            dirstate._process_entry = orig
 
1292
        self.addCleanup(cleanup)
 
1293
        dirstate._process_entry = _dirstate_helpers_c.ProcessEntryC
 
1294