~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Martin Pool
  • Date: 2005-07-07 07:57:12 UTC
  • mto: This revision was merged to the branch mainline in revision 852.
  • Revision ID: mbp@sourcefrog.net-20050707075712-4784aa908809b905
- preliminary merge conflict detection

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for the Repository facility that are not interface tests.
18
 
 
19
 
For interface tests see tests/repository_implementations/*.py.
20
 
 
21
 
For concrete class tests see this file, and for storage formats tests
22
 
also see this file.
23
 
"""
24
 
 
25
 
from stat import S_ISDIR
26
 
from StringIO import StringIO
27
 
 
28
 
import bzrlib
29
 
import bzrlib.bzrdir as bzrdir
30
 
import bzrlib.errors as errors
31
 
from bzrlib.errors import (NotBranchError,
32
 
                           NoSuchFile,
33
 
                           UnknownFormatError,
34
 
                           UnsupportedFormatError,
35
 
                           )
36
 
import bzrlib.repository as repository
37
 
from bzrlib.tests import TestCase, TestCaseWithTransport
38
 
from bzrlib.transport import get_transport
39
 
from bzrlib.transport.memory import MemoryServer
40
 
from bzrlib import upgrade, workingtree
41
 
 
42
 
 
43
 
class TestDefaultFormat(TestCase):
44
 
 
45
 
    def test_get_set_default_format(self):
46
 
        private_default = repository._default_format.__class__
47
 
        old_format = repository.RepositoryFormat.get_default_format()
48
 
        self.assertTrue(isinstance(old_format, private_default))
49
 
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
50
 
        # creating a repository should now create an instrumented dir.
51
 
        try:
52
 
            # the default branch format is used by the meta dir format
53
 
            # which is not the default bzrdir format at this point
54
 
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
55
 
            result = dir.create_repository()
56
 
            self.assertEqual(result, 'A bzr repository dir')
57
 
        finally:
58
 
            repository.RepositoryFormat.set_default_format(old_format)
59
 
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
60
 
 
61
 
 
62
 
class SampleRepositoryFormat(repository.RepositoryFormat):
63
 
    """A sample format
64
 
 
65
 
    this format is initializable, unsupported to aid in testing the 
66
 
    open and open(unsupported=True) routines.
67
 
    """
68
 
 
69
 
    def get_format_string(self):
70
 
        """See RepositoryFormat.get_format_string()."""
71
 
        return "Sample .bzr repository format."
72
 
 
73
 
    def initialize(self, a_bzrdir, shared=False):
74
 
        """Initialize a repository in a BzrDir"""
75
 
        t = a_bzrdir.get_repository_transport(self)
76
 
        t.put_bytes('format', self.get_format_string())
77
 
        return 'A bzr repository dir'
78
 
 
79
 
    def is_supported(self):
80
 
        return False
81
 
 
82
 
    def open(self, a_bzrdir, _found=False):
83
 
        return "opened repository."
84
 
 
85
 
 
86
 
class TestRepositoryFormat(TestCaseWithTransport):
87
 
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
88
 
 
89
 
    def test_find_format(self):
90
 
        # is the right format object found for a repository?
91
 
        # create a branch with a few known format objects.
92
 
        # this is not quite the same as 
93
 
        self.build_tree(["foo/", "bar/"])
94
 
        def check_format(format, url):
95
 
            dir = format._matchingbzrdir.initialize(url)
96
 
            format.initialize(dir)
97
 
            t = get_transport(url)
98
 
            found_format = repository.RepositoryFormat.find_format(dir)
99
 
            self.failUnless(isinstance(found_format, format.__class__))
100
 
        check_format(repository.RepositoryFormat7(), "bar")
101
 
        
102
 
    def test_find_format_no_repository(self):
103
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
104
 
        self.assertRaises(errors.NoRepositoryPresent,
105
 
                          repository.RepositoryFormat.find_format,
106
 
                          dir)
107
 
 
108
 
    def test_find_format_unknown_format(self):
109
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
110
 
        SampleRepositoryFormat().initialize(dir)
111
 
        self.assertRaises(UnknownFormatError,
112
 
                          repository.RepositoryFormat.find_format,
113
 
                          dir)
114
 
 
115
 
    def test_register_unregister_format(self):
116
 
        format = SampleRepositoryFormat()
117
 
        # make a control dir
118
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
119
 
        # make a repo
120
 
        format.initialize(dir)
121
 
        # register a format for it.
122
 
        repository.RepositoryFormat.register_format(format)
123
 
        # which repository.Open will refuse (not supported)
124
 
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
125
 
        # but open(unsupported) will work
126
 
        self.assertEqual(format.open(dir), "opened repository.")
127
 
        # unregister the format
128
 
        repository.RepositoryFormat.unregister_format(format)
129
 
 
130
 
 
131
 
class TestFormat6(TestCaseWithTransport):
132
 
 
133
 
    def test_no_ancestry_weave(self):
134
 
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
135
 
        repo = repository.RepositoryFormat6().initialize(control)
136
 
        # We no longer need to create the ancestry.weave file
137
 
        # since it is *never* used.
138
 
        self.assertRaises(NoSuchFile,
139
 
                          control.transport.get,
140
 
                          'ancestry.weave')
141
 
 
142
 
 
143
 
class TestFormat7(TestCaseWithTransport):
144
 
    
145
 
    def test_disk_layout(self):
146
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
147
 
        repo = repository.RepositoryFormat7().initialize(control)
148
 
        # in case of side effects of locking.
149
 
        repo.lock_write()
150
 
        repo.unlock()
151
 
        # we want:
152
 
        # format 'Bazaar-NG Repository format 7'
153
 
        # lock ''
154
 
        # inventory.weave == empty_weave
155
 
        # empty revision-store directory
156
 
        # empty weaves directory
157
 
        t = control.get_repository_transport(None)
158
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
159
 
                             t.get('format').read())
160
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
161
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
162
 
        self.assertEqualDiff('# bzr weave file v5\n'
163
 
                             'w\n'
164
 
                             'W\n',
165
 
                             t.get('inventory.weave').read())
166
 
 
167
 
    def test_shared_disk_layout(self):
168
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
169
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
170
 
        # we want:
171
 
        # format 'Bazaar-NG Repository format 7'
172
 
        # inventory.weave == empty_weave
173
 
        # empty revision-store directory
174
 
        # empty weaves directory
175
 
        # a 'shared-storage' marker file.
176
 
        # lock is not present when unlocked
177
 
        t = control.get_repository_transport(None)
178
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
179
 
                             t.get('format').read())
180
 
        self.assertEqualDiff('', t.get('shared-storage').read())
181
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
182
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
183
 
        self.assertEqualDiff('# bzr weave file v5\n'
184
 
                             'w\n'
185
 
                             'W\n',
186
 
                             t.get('inventory.weave').read())
187
 
        self.assertFalse(t.has('branch-lock'))
188
 
 
189
 
    def test_creates_lockdir(self):
190
 
        """Make sure it appears to be controlled by a LockDir existence"""
191
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
192
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
193
 
        t = control.get_repository_transport(None)
194
 
        # TODO: Should check there is a 'lock' toplevel directory, 
195
 
        # regardless of contents
196
 
        self.assertFalse(t.has('lock/held/info'))
197
 
        repo.lock_write()
198
 
        try:
199
 
            self.assertTrue(t.has('lock/held/info'))
200
 
        finally:
201
 
            # unlock so we don't get a warning about failing to do so
202
 
            repo.unlock()
203
 
 
204
 
    def test_uses_lockdir(self):
205
 
        """repo format 7 actually locks on lockdir"""
206
 
        base_url = self.get_url()
207
 
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
208
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
209
 
        t = control.get_repository_transport(None)
210
 
        repo.lock_write()
211
 
        repo.unlock()
212
 
        del repo
213
 
        # make sure the same lock is created by opening it
214
 
        repo = repository.Repository.open(base_url)
215
 
        repo.lock_write()
216
 
        self.assertTrue(t.has('lock/held/info'))
217
 
        repo.unlock()
218
 
        self.assertFalse(t.has('lock/held/info'))
219
 
 
220
 
    def test_shared_no_tree_disk_layout(self):
221
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
222
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
223
 
        repo.set_make_working_trees(False)
224
 
        # we want:
225
 
        # format 'Bazaar-NG Repository format 7'
226
 
        # lock ''
227
 
        # inventory.weave == empty_weave
228
 
        # empty revision-store directory
229
 
        # empty weaves directory
230
 
        # a 'shared-storage' marker file.
231
 
        t = control.get_repository_transport(None)
232
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
233
 
                             t.get('format').read())
234
 
        ## self.assertEqualDiff('', t.get('lock').read())
235
 
        self.assertEqualDiff('', t.get('shared-storage').read())
236
 
        self.assertEqualDiff('', t.get('no-working-trees').read())
237
 
        repo.set_make_working_trees(True)
238
 
        self.assertFalse(t.has('no-working-trees'))
239
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
240
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
241
 
        self.assertEqualDiff('# bzr weave file v5\n'
242
 
                             'w\n'
243
 
                             'W\n',
244
 
                             t.get('inventory.weave').read())
245
 
 
246
 
 
247
 
class TestFormatKnit1(TestCaseWithTransport):
248
 
    
249
 
    def test_disk_layout(self):
250
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
251
 
        repo = repository.RepositoryFormatKnit1().initialize(control)
252
 
        # in case of side effects of locking.
253
 
        repo.lock_write()
254
 
        repo.unlock()
255
 
        # we want:
256
 
        # format 'Bazaar-NG Knit Repository Format 1'
257
 
        # lock: is a directory
258
 
        # inventory.weave == empty_weave
259
 
        # empty revision-store directory
260
 
        # empty weaves directory
261
 
        t = control.get_repository_transport(None)
262
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
263
 
                             t.get('format').read())
264
 
        # XXX: no locks left when unlocked at the moment
265
 
        # self.assertEqualDiff('', t.get('lock').read())
266
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
267
 
        self.check_knits(t)
268
 
 
269
 
    def assertHasKnit(self, t, knit_name):
270
 
        """Assert that knit_name exists on t."""
271
 
        self.assertEqualDiff('# bzr knit index 8\n',
272
 
                             t.get(knit_name + '.kndx').read())
273
 
        # no default content
274
 
        self.assertTrue(t.has(knit_name + '.knit'))
275
 
 
276
 
    def check_knits(self, t):
277
 
        """check knit content for a repository."""
278
 
        self.assertHasKnit(t, 'inventory')
279
 
        self.assertHasKnit(t, 'revisions')
280
 
        self.assertHasKnit(t, 'signatures')
281
 
 
282
 
    def test_shared_disk_layout(self):
283
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
284
 
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
285
 
        # we want:
286
 
        # format 'Bazaar-NG Knit Repository Format 1'
287
 
        # lock: is a directory
288
 
        # inventory.weave == empty_weave
289
 
        # empty revision-store directory
290
 
        # empty weaves directory
291
 
        # a 'shared-storage' marker file.
292
 
        t = control.get_repository_transport(None)
293
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
294
 
                             t.get('format').read())
295
 
        # XXX: no locks left when unlocked at the moment
296
 
        # self.assertEqualDiff('', t.get('lock').read())
297
 
        self.assertEqualDiff('', t.get('shared-storage').read())
298
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
299
 
        self.check_knits(t)
300
 
 
301
 
    def test_shared_no_tree_disk_layout(self):
302
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
303
 
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
304
 
        repo.set_make_working_trees(False)
305
 
        # we want:
306
 
        # format 'Bazaar-NG Knit Repository Format 1'
307
 
        # lock ''
308
 
        # inventory.weave == empty_weave
309
 
        # empty revision-store directory
310
 
        # empty weaves directory
311
 
        # a 'shared-storage' marker file.
312
 
        t = control.get_repository_transport(None)
313
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
314
 
                             t.get('format').read())
315
 
        # XXX: no locks left when unlocked at the moment
316
 
        # self.assertEqualDiff('', t.get('lock').read())
317
 
        self.assertEqualDiff('', t.get('shared-storage').read())
318
 
        self.assertEqualDiff('', t.get('no-working-trees').read())
319
 
        repo.set_make_working_trees(True)
320
 
        self.assertFalse(t.has('no-working-trees'))
321
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
322
 
        self.check_knits(t)
323
 
 
324
 
 
325
 
class InterString(repository.InterRepository):
326
 
    """An inter-repository optimised code path for strings.
327
 
 
328
 
    This is for use during testing where we use strings as repositories
329
 
    so that none of the default regsitered inter-repository classes will
330
 
    match.
331
 
    """
332
 
 
333
 
    @staticmethod
334
 
    def is_compatible(repo_source, repo_target):
335
 
        """InterString is compatible with strings-as-repos."""
336
 
        return isinstance(repo_source, str) and isinstance(repo_target, str)
337
 
 
338
 
 
339
 
class TestInterRepository(TestCaseWithTransport):
340
 
 
341
 
    def test_get_default_inter_repository(self):
342
 
        # test that the InterRepository.get(repo_a, repo_b) probes
343
 
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
344
 
        # true and returns a default inter_repo otherwise.
345
 
        # This also tests that the default registered optimised interrepository
346
 
        # classes do not barf inappropriately when a surprising repository type
347
 
        # is handed to them.
348
 
        dummy_a = "Repository 1."
349
 
        dummy_b = "Repository 2."
350
 
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
351
 
 
352
 
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
353
 
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
354
 
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
355
 
        self.assertEqual(repository.InterRepository,
356
 
                         inter_repo.__class__)
357
 
        self.assertEqual(repo_a, inter_repo.source)
358
 
        self.assertEqual(repo_b, inter_repo.target)
359
 
 
360
 
    def test_register_inter_repository_class(self):
361
 
        # test that a optimised code path provider - a
362
 
        # InterRepository subclass can be registered and unregistered
363
 
        # and that it is correctly selected when given a repository
364
 
        # pair that it returns true on for the is_compatible static method
365
 
        # check
366
 
        dummy_a = "Repository 1."
367
 
        dummy_b = "Repository 2."
368
 
        repository.InterRepository.register_optimiser(InterString)
369
 
        try:
370
 
            # we should get the default for something InterString returns False
371
 
            # to
372
 
            self.assertFalse(InterString.is_compatible(dummy_a, None))
373
 
            self.assertGetsDefaultInterRepository(dummy_a, None)
374
 
            # and we should get an InterString for a pair it 'likes'
375
 
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
376
 
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
377
 
            self.assertEqual(InterString, inter_repo.__class__)
378
 
            self.assertEqual(dummy_a, inter_repo.source)
379
 
            self.assertEqual(dummy_b, inter_repo.target)
380
 
        finally:
381
 
            repository.InterRepository.unregister_optimiser(InterString)
382
 
        # now we should get the default InterRepository object again.
383
 
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
384
 
 
385
 
 
386
 
class TestInterWeaveRepo(TestCaseWithTransport):
387
 
 
388
 
    def test_is_compatible_and_registered(self):
389
 
        # InterWeaveRepo is compatible when either side
390
 
        # is a format 5/6/7 branch
391
 
        formats = [repository.RepositoryFormat5(),
392
 
                   repository.RepositoryFormat6(),
393
 
                   repository.RepositoryFormat7()]
394
 
        incompatible_formats = [repository.RepositoryFormat4(),
395
 
                                repository.RepositoryFormatKnit1(),
396
 
                                ]
397
 
        repo_a = self.make_repository('a')
398
 
        repo_b = self.make_repository('b')
399
 
        is_compatible = repository.InterWeaveRepo.is_compatible
400
 
        for source in incompatible_formats:
401
 
            # force incompatible left then right
402
 
            repo_a._format = source
403
 
            repo_b._format = formats[0]
404
 
            self.assertFalse(is_compatible(repo_a, repo_b))
405
 
            self.assertFalse(is_compatible(repo_b, repo_a))
406
 
        for source in formats:
407
 
            repo_a._format = source
408
 
            for target in formats:
409
 
                repo_b._format = target
410
 
                self.assertTrue(is_compatible(repo_a, repo_b))
411
 
        self.assertEqual(repository.InterWeaveRepo,
412
 
                         repository.InterRepository.get(repo_a,
413
 
                                                        repo_b).__class__)
414
 
 
415
 
 
416
 
class TestRepositoryConverter(TestCaseWithTransport):
417
 
 
418
 
    def test_convert_empty(self):
419
 
        t = get_transport(self.get_url('.'))
420
 
        t.mkdir('repository')
421
 
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
422
 
        repo = repository.RepositoryFormat7().initialize(repo_dir)
423
 
        target_format = repository.RepositoryFormatKnit1()
424
 
        converter = repository.CopyConverter(target_format)
425
 
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
426
 
        try:
427
 
            converter.convert(repo, pb)
428
 
        finally:
429
 
            pb.finished()
430
 
        repo = repo_dir.open_repository()
431
 
        self.assertTrue(isinstance(target_format, repo._format.__class__))
432
 
 
433
 
 
434
 
class TestMisc(TestCase):
435
 
    
436
 
    def test_unescape_xml(self):
437
 
        """We get some kind of error when malformed entities are passed"""
438
 
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;') 
439
 
 
440
 
 
441
 
class TestRepositoryFormatKnit2(TestCaseWithTransport):
442
 
 
443
 
    def test_convert(self):
444
 
        """Ensure the upgrade adds weaves for roots"""
445
 
        format = bzrdir.BzrDirMetaFormat1()
446
 
        format.repository_format = repository.RepositoryFormatKnit1()
447
 
        tree = self.make_branch_and_tree('.', format)
448
 
        tree.commit("Dull commit", rev_id="dull")
449
 
        revision_tree = tree.branch.repository.revision_tree('dull')
450
 
        self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
451
 
            revision_tree.inventory.root.file_id)
452
 
        format = bzrdir.BzrDirMetaFormat1()
453
 
        format.repository_format = repository.RepositoryFormatKnit2()
454
 
        upgrade.Convert('.', format)
455
 
        tree = workingtree.WorkingTree.open('.')
456
 
        revision_tree = tree.branch.repository.revision_tree('dull')
457
 
        revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
458
 
        tree.commit("Another dull commit", rev_id='dull2')
459
 
        revision_tree = tree.branch.repository.revision_tree('dull2')
460
 
        self.assertEqual('dull', revision_tree.inventory.root.revision)