~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-12-20 18:52:55 UTC
  • mfrom: (2204.2.1 bzr.dev)
  • Revision ID: pqm@pqm.ubuntu.com-20061220185255-86cd0a40a9c2e76e
(Wouter van Heyst) Mention the revisionspec topic in the revision option help (#31633).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the Repository facility that are not interface tests.
 
18
 
 
19
For interface tests see tests/repository_implementations/*.py.
 
20
 
 
21
For concrete class tests see this file, and for storage formats tests
 
22
also see this file.
 
23
"""
 
24
 
 
25
from stat import S_ISDIR
 
26
from StringIO import StringIO
 
27
 
 
28
import bzrlib
 
29
import bzrlib.bzrdir as bzrdir
 
30
import bzrlib.errors as errors
 
31
from bzrlib.errors import (NotBranchError,
 
32
                           NoSuchFile,
 
33
                           UnknownFormatError,
 
34
                           UnsupportedFormatError,
 
35
                           )
 
36
import bzrlib.repository as repository
 
37
from bzrlib.tests import TestCase, TestCaseWithTransport
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.memory import MemoryServer
 
40
from bzrlib import upgrade, workingtree
 
41
 
 
42
 
 
43
class TestDefaultFormat(TestCase):
 
44
 
 
45
    def test_get_set_default_format(self):
 
46
        private_default = repository._default_format.__class__
 
47
        old_format = repository.RepositoryFormat.get_default_format()
 
48
        self.assertTrue(isinstance(old_format, private_default))
 
49
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
 
50
        # creating a repository should now create an instrumented dir.
 
51
        try:
 
52
            # the default branch format is used by the meta dir format
 
53
            # which is not the default bzrdir format at this point
 
54
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
 
55
            result = dir.create_repository()
 
56
            self.assertEqual(result, 'A bzr repository dir')
 
57
        finally:
 
58
            repository.RepositoryFormat.set_default_format(old_format)
 
59
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
 
60
 
 
61
 
 
62
class SampleRepositoryFormat(repository.RepositoryFormat):
 
63
    """A sample format
 
64
 
 
65
    this format is initializable, unsupported to aid in testing the 
 
66
    open and open(unsupported=True) routines.
 
67
    """
 
68
 
 
69
    def get_format_string(self):
 
70
        """See RepositoryFormat.get_format_string()."""
 
71
        return "Sample .bzr repository format."
 
72
 
 
73
    def initialize(self, a_bzrdir, shared=False):
 
74
        """Initialize a repository in a BzrDir"""
 
75
        t = a_bzrdir.get_repository_transport(self)
 
76
        t.put_bytes('format', self.get_format_string())
 
77
        return 'A bzr repository dir'
 
78
 
 
79
    def is_supported(self):
 
80
        return False
 
81
 
 
82
    def open(self, a_bzrdir, _found=False):
 
83
        return "opened repository."
 
84
 
 
85
 
 
86
class TestRepositoryFormat(TestCaseWithTransport):
 
87
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
 
88
 
 
89
    def test_find_format(self):
 
90
        # is the right format object found for a repository?
 
91
        # create a branch with a few known format objects.
 
92
        # this is not quite the same as 
 
93
        self.build_tree(["foo/", "bar/"])
 
94
        def check_format(format, url):
 
95
            dir = format._matchingbzrdir.initialize(url)
 
96
            format.initialize(dir)
 
97
            t = get_transport(url)
 
98
            found_format = repository.RepositoryFormat.find_format(dir)
 
99
            self.failUnless(isinstance(found_format, format.__class__))
 
100
        check_format(repository.RepositoryFormat7(), "bar")
 
101
        
 
102
    def test_find_format_no_repository(self):
 
103
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
104
        self.assertRaises(errors.NoRepositoryPresent,
 
105
                          repository.RepositoryFormat.find_format,
 
106
                          dir)
 
107
 
 
108
    def test_find_format_unknown_format(self):
 
109
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
110
        SampleRepositoryFormat().initialize(dir)
 
111
        self.assertRaises(UnknownFormatError,
 
112
                          repository.RepositoryFormat.find_format,
 
113
                          dir)
 
114
 
 
115
    def test_register_unregister_format(self):
 
116
        format = SampleRepositoryFormat()
 
117
        # make a control dir
 
118
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
119
        # make a repo
 
120
        format.initialize(dir)
 
121
        # register a format for it.
 
122
        repository.RepositoryFormat.register_format(format)
 
123
        # which repository.Open will refuse (not supported)
 
124
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
 
125
        # but open(unsupported) will work
 
126
        self.assertEqual(format.open(dir), "opened repository.")
 
127
        # unregister the format
 
128
        repository.RepositoryFormat.unregister_format(format)
 
129
 
 
130
 
 
131
class TestFormat6(TestCaseWithTransport):
 
132
 
 
133
    def test_no_ancestry_weave(self):
 
134
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
135
        repo = repository.RepositoryFormat6().initialize(control)
 
136
        # We no longer need to create the ancestry.weave file
 
137
        # since it is *never* used.
 
138
        self.assertRaises(NoSuchFile,
 
139
                          control.transport.get,
 
140
                          'ancestry.weave')
 
141
 
 
142
 
 
143
class TestFormat7(TestCaseWithTransport):
 
144
    
 
145
    def test_disk_layout(self):
 
146
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
147
        repo = repository.RepositoryFormat7().initialize(control)
 
148
        # in case of side effects of locking.
 
149
        repo.lock_write()
 
150
        repo.unlock()
 
151
        # we want:
 
152
        # format 'Bazaar-NG Repository format 7'
 
153
        # lock ''
 
154
        # inventory.weave == empty_weave
 
155
        # empty revision-store directory
 
156
        # empty weaves directory
 
157
        t = control.get_repository_transport(None)
 
158
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
159
                             t.get('format').read())
 
160
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
161
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
162
        self.assertEqualDiff('# bzr weave file v5\n'
 
163
                             'w\n'
 
164
                             'W\n',
 
165
                             t.get('inventory.weave').read())
 
166
 
 
167
    def test_shared_disk_layout(self):
 
168
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
169
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
170
        # we want:
 
171
        # format 'Bazaar-NG Repository format 7'
 
172
        # inventory.weave == empty_weave
 
173
        # empty revision-store directory
 
174
        # empty weaves directory
 
175
        # a 'shared-storage' marker file.
 
176
        # lock is not present when unlocked
 
177
        t = control.get_repository_transport(None)
 
178
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
179
                             t.get('format').read())
 
180
        self.assertEqualDiff('', t.get('shared-storage').read())
 
181
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
182
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
183
        self.assertEqualDiff('# bzr weave file v5\n'
 
184
                             'w\n'
 
185
                             'W\n',
 
186
                             t.get('inventory.weave').read())
 
187
        self.assertFalse(t.has('branch-lock'))
 
188
 
 
189
    def test_creates_lockdir(self):
 
190
        """Make sure it appears to be controlled by a LockDir existence"""
 
191
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
192
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
193
        t = control.get_repository_transport(None)
 
194
        # TODO: Should check there is a 'lock' toplevel directory, 
 
195
        # regardless of contents
 
196
        self.assertFalse(t.has('lock/held/info'))
 
197
        repo.lock_write()
 
198
        try:
 
199
            self.assertTrue(t.has('lock/held/info'))
 
200
        finally:
 
201
            # unlock so we don't get a warning about failing to do so
 
202
            repo.unlock()
 
203
 
 
204
    def test_uses_lockdir(self):
 
205
        """repo format 7 actually locks on lockdir"""
 
206
        base_url = self.get_url()
 
207
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
 
208
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
209
        t = control.get_repository_transport(None)
 
210
        repo.lock_write()
 
211
        repo.unlock()
 
212
        del repo
 
213
        # make sure the same lock is created by opening it
 
214
        repo = repository.Repository.open(base_url)
 
215
        repo.lock_write()
 
216
        self.assertTrue(t.has('lock/held/info'))
 
217
        repo.unlock()
 
218
        self.assertFalse(t.has('lock/held/info'))
 
219
 
 
220
    def test_shared_no_tree_disk_layout(self):
 
221
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
222
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
223
        repo.set_make_working_trees(False)
 
224
        # we want:
 
225
        # format 'Bazaar-NG Repository format 7'
 
226
        # lock ''
 
227
        # inventory.weave == empty_weave
 
228
        # empty revision-store directory
 
229
        # empty weaves directory
 
230
        # a 'shared-storage' marker file.
 
231
        t = control.get_repository_transport(None)
 
232
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
233
                             t.get('format').read())
 
234
        ## self.assertEqualDiff('', t.get('lock').read())
 
235
        self.assertEqualDiff('', t.get('shared-storage').read())
 
236
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
237
        repo.set_make_working_trees(True)
 
238
        self.assertFalse(t.has('no-working-trees'))
 
239
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
240
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
241
        self.assertEqualDiff('# bzr weave file v5\n'
 
242
                             'w\n'
 
243
                             'W\n',
 
244
                             t.get('inventory.weave').read())
 
245
 
 
246
 
 
247
class TestFormatKnit1(TestCaseWithTransport):
 
248
    
 
249
    def test_disk_layout(self):
 
250
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
251
        repo = repository.RepositoryFormatKnit1().initialize(control)
 
252
        # in case of side effects of locking.
 
253
        repo.lock_write()
 
254
        repo.unlock()
 
255
        # we want:
 
256
        # format 'Bazaar-NG Knit Repository Format 1'
 
257
        # lock: is a directory
 
258
        # inventory.weave == empty_weave
 
259
        # empty revision-store directory
 
260
        # empty weaves directory
 
261
        t = control.get_repository_transport(None)
 
262
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
263
                             t.get('format').read())
 
264
        # XXX: no locks left when unlocked at the moment
 
265
        # self.assertEqualDiff('', t.get('lock').read())
 
266
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
267
        self.check_knits(t)
 
268
 
 
269
    def assertHasKnit(self, t, knit_name):
 
270
        """Assert that knit_name exists on t."""
 
271
        self.assertEqualDiff('# bzr knit index 8\n',
 
272
                             t.get(knit_name + '.kndx').read())
 
273
        # no default content
 
274
        self.assertTrue(t.has(knit_name + '.knit'))
 
275
 
 
276
    def check_knits(self, t):
 
277
        """check knit content for a repository."""
 
278
        self.assertHasKnit(t, 'inventory')
 
279
        self.assertHasKnit(t, 'revisions')
 
280
        self.assertHasKnit(t, 'signatures')
 
281
 
 
282
    def test_shared_disk_layout(self):
 
283
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
284
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
285
        # we want:
 
286
        # format 'Bazaar-NG Knit Repository Format 1'
 
287
        # lock: is a directory
 
288
        # inventory.weave == empty_weave
 
289
        # empty revision-store directory
 
290
        # empty weaves directory
 
291
        # a 'shared-storage' marker file.
 
292
        t = control.get_repository_transport(None)
 
293
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
294
                             t.get('format').read())
 
295
        # XXX: no locks left when unlocked at the moment
 
296
        # self.assertEqualDiff('', t.get('lock').read())
 
297
        self.assertEqualDiff('', t.get('shared-storage').read())
 
298
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
299
        self.check_knits(t)
 
300
 
 
301
    def test_shared_no_tree_disk_layout(self):
 
302
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
303
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
304
        repo.set_make_working_trees(False)
 
305
        # we want:
 
306
        # format 'Bazaar-NG Knit Repository Format 1'
 
307
        # lock ''
 
308
        # inventory.weave == empty_weave
 
309
        # empty revision-store directory
 
310
        # empty weaves directory
 
311
        # a 'shared-storage' marker file.
 
312
        t = control.get_repository_transport(None)
 
313
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
314
                             t.get('format').read())
 
315
        # XXX: no locks left when unlocked at the moment
 
316
        # self.assertEqualDiff('', t.get('lock').read())
 
317
        self.assertEqualDiff('', t.get('shared-storage').read())
 
318
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
319
        repo.set_make_working_trees(True)
 
320
        self.assertFalse(t.has('no-working-trees'))
 
321
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
322
        self.check_knits(t)
 
323
 
 
324
 
 
325
class InterString(repository.InterRepository):
 
326
    """An inter-repository optimised code path for strings.
 
327
 
 
328
    This is for use during testing where we use strings as repositories
 
329
    so that none of the default regsitered inter-repository classes will
 
330
    match.
 
331
    """
 
332
 
 
333
    @staticmethod
 
334
    def is_compatible(repo_source, repo_target):
 
335
        """InterString is compatible with strings-as-repos."""
 
336
        return isinstance(repo_source, str) and isinstance(repo_target, str)
 
337
 
 
338
 
 
339
class TestInterRepository(TestCaseWithTransport):
 
340
 
 
341
    def test_get_default_inter_repository(self):
 
342
        # test that the InterRepository.get(repo_a, repo_b) probes
 
343
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
 
344
        # true and returns a default inter_repo otherwise.
 
345
        # This also tests that the default registered optimised interrepository
 
346
        # classes do not barf inappropriately when a surprising repository type
 
347
        # is handed to them.
 
348
        dummy_a = "Repository 1."
 
349
        dummy_b = "Repository 2."
 
350
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
351
 
 
352
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
 
353
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
 
354
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
 
355
        self.assertEqual(repository.InterRepository,
 
356
                         inter_repo.__class__)
 
357
        self.assertEqual(repo_a, inter_repo.source)
 
358
        self.assertEqual(repo_b, inter_repo.target)
 
359
 
 
360
    def test_register_inter_repository_class(self):
 
361
        # test that a optimised code path provider - a
 
362
        # InterRepository subclass can be registered and unregistered
 
363
        # and that it is correctly selected when given a repository
 
364
        # pair that it returns true on for the is_compatible static method
 
365
        # check
 
366
        dummy_a = "Repository 1."
 
367
        dummy_b = "Repository 2."
 
368
        repository.InterRepository.register_optimiser(InterString)
 
369
        try:
 
370
            # we should get the default for something InterString returns False
 
371
            # to
 
372
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
373
            self.assertGetsDefaultInterRepository(dummy_a, None)
 
374
            # and we should get an InterString for a pair it 'likes'
 
375
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
376
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
 
377
            self.assertEqual(InterString, inter_repo.__class__)
 
378
            self.assertEqual(dummy_a, inter_repo.source)
 
379
            self.assertEqual(dummy_b, inter_repo.target)
 
380
        finally:
 
381
            repository.InterRepository.unregister_optimiser(InterString)
 
382
        # now we should get the default InterRepository object again.
 
383
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
384
 
 
385
 
 
386
class TestInterWeaveRepo(TestCaseWithTransport):
 
387
 
 
388
    def test_is_compatible_and_registered(self):
 
389
        # InterWeaveRepo is compatible when either side
 
390
        # is a format 5/6/7 branch
 
391
        formats = [repository.RepositoryFormat5(),
 
392
                   repository.RepositoryFormat6(),
 
393
                   repository.RepositoryFormat7()]
 
394
        incompatible_formats = [repository.RepositoryFormat4(),
 
395
                                repository.RepositoryFormatKnit1(),
 
396
                                ]
 
397
        repo_a = self.make_repository('a')
 
398
        repo_b = self.make_repository('b')
 
399
        is_compatible = repository.InterWeaveRepo.is_compatible
 
400
        for source in incompatible_formats:
 
401
            # force incompatible left then right
 
402
            repo_a._format = source
 
403
            repo_b._format = formats[0]
 
404
            self.assertFalse(is_compatible(repo_a, repo_b))
 
405
            self.assertFalse(is_compatible(repo_b, repo_a))
 
406
        for source in formats:
 
407
            repo_a._format = source
 
408
            for target in formats:
 
409
                repo_b._format = target
 
410
                self.assertTrue(is_compatible(repo_a, repo_b))
 
411
        self.assertEqual(repository.InterWeaveRepo,
 
412
                         repository.InterRepository.get(repo_a,
 
413
                                                        repo_b).__class__)
 
414
 
 
415
 
 
416
class TestRepositoryConverter(TestCaseWithTransport):
 
417
 
 
418
    def test_convert_empty(self):
 
419
        t = get_transport(self.get_url('.'))
 
420
        t.mkdir('repository')
 
421
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
 
422
        repo = repository.RepositoryFormat7().initialize(repo_dir)
 
423
        target_format = repository.RepositoryFormatKnit1()
 
424
        converter = repository.CopyConverter(target_format)
 
425
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
426
        try:
 
427
            converter.convert(repo, pb)
 
428
        finally:
 
429
            pb.finished()
 
430
        repo = repo_dir.open_repository()
 
431
        self.assertTrue(isinstance(target_format, repo._format.__class__))
 
432
 
 
433
 
 
434
class TestMisc(TestCase):
 
435
    
 
436
    def test_unescape_xml(self):
 
437
        """We get some kind of error when malformed entities are passed"""
 
438
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;') 
 
439
 
 
440
 
 
441
class TestRepositoryFormatKnit2(TestCaseWithTransport):
 
442
 
 
443
    def test_convert(self):
 
444
        """Ensure the upgrade adds weaves for roots"""
 
445
        format = bzrdir.BzrDirMetaFormat1()
 
446
        format.repository_format = repository.RepositoryFormatKnit1()
 
447
        tree = self.make_branch_and_tree('.', format)
 
448
        tree.commit("Dull commit", rev_id="dull")
 
449
        revision_tree = tree.branch.repository.revision_tree('dull')
 
450
        self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
 
451
            revision_tree.inventory.root.file_id)
 
452
        format = bzrdir.BzrDirMetaFormat1()
 
453
        format.repository_format = repository.RepositoryFormatKnit2()
 
454
        upgrade.Convert('.', format)
 
455
        tree = workingtree.WorkingTree.open('.')
 
456
        revision_tree = tree.branch.repository.revision_tree('dull')
 
457
        revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
 
458
        tree.commit("Another dull commit", rev_id='dull2')
 
459
        revision_tree = tree.branch.repository.revision_tree('dull2')
 
460
        self.assertEqual('dull', revision_tree.inventory.root.revision)