~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Martin Pool
  • Date: 2005-04-29 03:32:40 UTC
  • Revision ID: mbp@sourcefrog.net-20050429033239-43b3a4781828f11d
todo

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for the Repository facility that are not interface tests.
18
 
 
19
 
For interface tests see tests/repository_implementations/*.py.
20
 
 
21
 
For concrete class tests see this file, and for storage formats tests
22
 
also see this file.
23
 
"""
24
 
 
25
 
from stat import S_ISDIR
26
 
from StringIO import StringIO
27
 
 
28
 
import bzrlib
29
 
import bzrlib.bzrdir as bzrdir
30
 
import bzrlib.errors as errors
31
 
from bzrlib.errors import (NotBranchError,
32
 
                           NoSuchFile,
33
 
                           UnknownFormatError,
34
 
                           UnsupportedFormatError,
35
 
                           )
36
 
import bzrlib.repository as repository
37
 
from bzrlib.tests import TestCase, TestCaseWithTransport
38
 
from bzrlib.transport import get_transport
39
 
from bzrlib.transport.http import HttpServer
40
 
from bzrlib.transport.memory import MemoryServer
41
 
from bzrlib import upgrade, workingtree
42
 
 
43
 
 
44
 
class TestDefaultFormat(TestCase):
45
 
 
46
 
    def test_get_set_default_format(self):
47
 
        private_default = repository._default_format.__class__
48
 
        old_format = repository.RepositoryFormat.get_default_format()
49
 
        self.assertTrue(isinstance(old_format, private_default))
50
 
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
51
 
        # creating a repository should now create an instrumented dir.
52
 
        try:
53
 
            # the default branch format is used by the meta dir format
54
 
            # which is not the default bzrdir format at this point
55
 
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
56
 
            result = dir.create_repository()
57
 
            self.assertEqual(result, 'A bzr repository dir')
58
 
        finally:
59
 
            repository.RepositoryFormat.set_default_format(old_format)
60
 
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
61
 
 
62
 
 
63
 
class SampleRepositoryFormat(repository.RepositoryFormat):
64
 
    """A sample format
65
 
 
66
 
    this format is initializable, unsupported to aid in testing the 
67
 
    open and open(unsupported=True) routines.
68
 
    """
69
 
 
70
 
    def get_format_string(self):
71
 
        """See RepositoryFormat.get_format_string()."""
72
 
        return "Sample .bzr repository format."
73
 
 
74
 
    def initialize(self, a_bzrdir, shared=False):
75
 
        """Initialize a repository in a BzrDir"""
76
 
        t = a_bzrdir.get_repository_transport(self)
77
 
        t.put_bytes('format', self.get_format_string())
78
 
        return 'A bzr repository dir'
79
 
 
80
 
    def is_supported(self):
81
 
        return False
82
 
 
83
 
    def open(self, a_bzrdir, _found=False):
84
 
        return "opened repository."
85
 
 
86
 
 
87
 
class TestRepositoryFormat(TestCaseWithTransport):
88
 
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
89
 
 
90
 
    def test_find_format(self):
91
 
        # is the right format object found for a repository?
92
 
        # create a branch with a few known format objects.
93
 
        # this is not quite the same as 
94
 
        self.build_tree(["foo/", "bar/"])
95
 
        def check_format(format, url):
96
 
            dir = format._matchingbzrdir.initialize(url)
97
 
            format.initialize(dir)
98
 
            t = get_transport(url)
99
 
            found_format = repository.RepositoryFormat.find_format(dir)
100
 
            self.failUnless(isinstance(found_format, format.__class__))
101
 
        check_format(repository.RepositoryFormat7(), "bar")
102
 
        
103
 
    def test_find_format_no_repository(self):
104
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
105
 
        self.assertRaises(errors.NoRepositoryPresent,
106
 
                          repository.RepositoryFormat.find_format,
107
 
                          dir)
108
 
 
109
 
    def test_find_format_unknown_format(self):
110
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
111
 
        SampleRepositoryFormat().initialize(dir)
112
 
        self.assertRaises(UnknownFormatError,
113
 
                          repository.RepositoryFormat.find_format,
114
 
                          dir)
115
 
 
116
 
    def test_register_unregister_format(self):
117
 
        format = SampleRepositoryFormat()
118
 
        # make a control dir
119
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
120
 
        # make a repo
121
 
        format.initialize(dir)
122
 
        # register a format for it.
123
 
        repository.RepositoryFormat.register_format(format)
124
 
        # which repository.Open will refuse (not supported)
125
 
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
126
 
        # but open(unsupported) will work
127
 
        self.assertEqual(format.open(dir), "opened repository.")
128
 
        # unregister the format
129
 
        repository.RepositoryFormat.unregister_format(format)
130
 
 
131
 
 
132
 
class TestFormat6(TestCaseWithTransport):
133
 
 
134
 
    def test_no_ancestry_weave(self):
135
 
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
136
 
        repo = repository.RepositoryFormat6().initialize(control)
137
 
        # We no longer need to create the ancestry.weave file
138
 
        # since it is *never* used.
139
 
        self.assertRaises(NoSuchFile,
140
 
                          control.transport.get,
141
 
                          'ancestry.weave')
142
 
 
143
 
 
144
 
class TestFormat7(TestCaseWithTransport):
145
 
    
146
 
    def test_disk_layout(self):
147
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
148
 
        repo = repository.RepositoryFormat7().initialize(control)
149
 
        # in case of side effects of locking.
150
 
        repo.lock_write()
151
 
        repo.unlock()
152
 
        # we want:
153
 
        # format 'Bazaar-NG Repository format 7'
154
 
        # lock ''
155
 
        # inventory.weave == empty_weave
156
 
        # empty revision-store directory
157
 
        # empty weaves directory
158
 
        t = control.get_repository_transport(None)
159
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
160
 
                             t.get('format').read())
161
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
162
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
163
 
        self.assertEqualDiff('# bzr weave file v5\n'
164
 
                             'w\n'
165
 
                             'W\n',
166
 
                             t.get('inventory.weave').read())
167
 
 
168
 
    def test_shared_disk_layout(self):
169
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
170
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
171
 
        # we want:
172
 
        # format 'Bazaar-NG Repository format 7'
173
 
        # inventory.weave == empty_weave
174
 
        # empty revision-store directory
175
 
        # empty weaves directory
176
 
        # a 'shared-storage' marker file.
177
 
        # lock is not present when unlocked
178
 
        t = control.get_repository_transport(None)
179
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
180
 
                             t.get('format').read())
181
 
        self.assertEqualDiff('', t.get('shared-storage').read())
182
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
183
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
184
 
        self.assertEqualDiff('# bzr weave file v5\n'
185
 
                             'w\n'
186
 
                             'W\n',
187
 
                             t.get('inventory.weave').read())
188
 
        self.assertFalse(t.has('branch-lock'))
189
 
 
190
 
    def test_creates_lockdir(self):
191
 
        """Make sure it appears to be controlled by a LockDir existence"""
192
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
193
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
194
 
        t = control.get_repository_transport(None)
195
 
        # TODO: Should check there is a 'lock' toplevel directory, 
196
 
        # regardless of contents
197
 
        self.assertFalse(t.has('lock/held/info'))
198
 
        repo.lock_write()
199
 
        try:
200
 
            self.assertTrue(t.has('lock/held/info'))
201
 
        finally:
202
 
            # unlock so we don't get a warning about failing to do so
203
 
            repo.unlock()
204
 
 
205
 
    def test_uses_lockdir(self):
206
 
        """repo format 7 actually locks on lockdir"""
207
 
        base_url = self.get_url()
208
 
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
209
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
210
 
        t = control.get_repository_transport(None)
211
 
        repo.lock_write()
212
 
        repo.unlock()
213
 
        del repo
214
 
        # make sure the same lock is created by opening it
215
 
        repo = repository.Repository.open(base_url)
216
 
        repo.lock_write()
217
 
        self.assertTrue(t.has('lock/held/info'))
218
 
        repo.unlock()
219
 
        self.assertFalse(t.has('lock/held/info'))
220
 
 
221
 
    def test_shared_no_tree_disk_layout(self):
222
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
223
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
224
 
        repo.set_make_working_trees(False)
225
 
        # we want:
226
 
        # format 'Bazaar-NG Repository format 7'
227
 
        # lock ''
228
 
        # inventory.weave == empty_weave
229
 
        # empty revision-store directory
230
 
        # empty weaves directory
231
 
        # a 'shared-storage' marker file.
232
 
        t = control.get_repository_transport(None)
233
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
234
 
                             t.get('format').read())
235
 
        ## self.assertEqualDiff('', t.get('lock').read())
236
 
        self.assertEqualDiff('', t.get('shared-storage').read())
237
 
        self.assertEqualDiff('', t.get('no-working-trees').read())
238
 
        repo.set_make_working_trees(True)
239
 
        self.assertFalse(t.has('no-working-trees'))
240
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
241
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
242
 
        self.assertEqualDiff('# bzr weave file v5\n'
243
 
                             'w\n'
244
 
                             'W\n',
245
 
                             t.get('inventory.weave').read())
246
 
 
247
 
 
248
 
class TestFormatKnit1(TestCaseWithTransport):
249
 
    
250
 
    def test_disk_layout(self):
251
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
252
 
        repo = repository.RepositoryFormatKnit1().initialize(control)
253
 
        # in case of side effects of locking.
254
 
        repo.lock_write()
255
 
        repo.unlock()
256
 
        # we want:
257
 
        # format 'Bazaar-NG Knit Repository Format 1'
258
 
        # lock: is a directory
259
 
        # inventory.weave == empty_weave
260
 
        # empty revision-store directory
261
 
        # empty weaves directory
262
 
        t = control.get_repository_transport(None)
263
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
264
 
                             t.get('format').read())
265
 
        # XXX: no locks left when unlocked at the moment
266
 
        # self.assertEqualDiff('', t.get('lock').read())
267
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
268
 
        self.check_knits(t)
269
 
 
270
 
    def assertHasKnit(self, t, knit_name):
271
 
        """Assert that knit_name exists on t."""
272
 
        self.assertEqualDiff('# bzr knit index 8\n',
273
 
                             t.get(knit_name + '.kndx').read())
274
 
        # no default content
275
 
        self.assertTrue(t.has(knit_name + '.knit'))
276
 
 
277
 
    def check_knits(self, t):
278
 
        """check knit content for a repository."""
279
 
        self.assertHasKnit(t, 'inventory')
280
 
        self.assertHasKnit(t, 'revisions')
281
 
        self.assertHasKnit(t, 'signatures')
282
 
 
283
 
    def test_shared_disk_layout(self):
284
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
285
 
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
286
 
        # we want:
287
 
        # format 'Bazaar-NG Knit Repository Format 1'
288
 
        # lock: is a directory
289
 
        # inventory.weave == empty_weave
290
 
        # empty revision-store directory
291
 
        # empty weaves directory
292
 
        # a 'shared-storage' marker file.
293
 
        t = control.get_repository_transport(None)
294
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
295
 
                             t.get('format').read())
296
 
        # XXX: no locks left when unlocked at the moment
297
 
        # self.assertEqualDiff('', t.get('lock').read())
298
 
        self.assertEqualDiff('', t.get('shared-storage').read())
299
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
300
 
        self.check_knits(t)
301
 
 
302
 
    def test_shared_no_tree_disk_layout(self):
303
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
304
 
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
305
 
        repo.set_make_working_trees(False)
306
 
        # we want:
307
 
        # format 'Bazaar-NG Knit Repository Format 1'
308
 
        # lock ''
309
 
        # inventory.weave == empty_weave
310
 
        # empty revision-store directory
311
 
        # empty weaves directory
312
 
        # a 'shared-storage' marker file.
313
 
        t = control.get_repository_transport(None)
314
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
315
 
                             t.get('format').read())
316
 
        # XXX: no locks left when unlocked at the moment
317
 
        # self.assertEqualDiff('', t.get('lock').read())
318
 
        self.assertEqualDiff('', t.get('shared-storage').read())
319
 
        self.assertEqualDiff('', t.get('no-working-trees').read())
320
 
        repo.set_make_working_trees(True)
321
 
        self.assertFalse(t.has('no-working-trees'))
322
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
323
 
        self.check_knits(t)
324
 
 
325
 
 
326
 
class InterString(repository.InterRepository):
327
 
    """An inter-repository optimised code path for strings.
328
 
 
329
 
    This is for use during testing where we use strings as repositories
330
 
    so that none of the default regsitered inter-repository classes will
331
 
    match.
332
 
    """
333
 
 
334
 
    @staticmethod
335
 
    def is_compatible(repo_source, repo_target):
336
 
        """InterString is compatible with strings-as-repos."""
337
 
        return isinstance(repo_source, str) and isinstance(repo_target, str)
338
 
 
339
 
 
340
 
class TestInterRepository(TestCaseWithTransport):
341
 
 
342
 
    def test_get_default_inter_repository(self):
343
 
        # test that the InterRepository.get(repo_a, repo_b) probes
344
 
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
345
 
        # true and returns a default inter_repo otherwise.
346
 
        # This also tests that the default registered optimised interrepository
347
 
        # classes do not barf inappropriately when a surprising repository type
348
 
        # is handed to them.
349
 
        dummy_a = "Repository 1."
350
 
        dummy_b = "Repository 2."
351
 
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
352
 
 
353
 
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
354
 
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
355
 
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
356
 
        self.assertEqual(repository.InterRepository,
357
 
                         inter_repo.__class__)
358
 
        self.assertEqual(repo_a, inter_repo.source)
359
 
        self.assertEqual(repo_b, inter_repo.target)
360
 
 
361
 
    def test_register_inter_repository_class(self):
362
 
        # test that a optimised code path provider - a
363
 
        # InterRepository subclass can be registered and unregistered
364
 
        # and that it is correctly selected when given a repository
365
 
        # pair that it returns true on for the is_compatible static method
366
 
        # check
367
 
        dummy_a = "Repository 1."
368
 
        dummy_b = "Repository 2."
369
 
        repository.InterRepository.register_optimiser(InterString)
370
 
        try:
371
 
            # we should get the default for something InterString returns False
372
 
            # to
373
 
            self.assertFalse(InterString.is_compatible(dummy_a, None))
374
 
            self.assertGetsDefaultInterRepository(dummy_a, None)
375
 
            # and we should get an InterString for a pair it 'likes'
376
 
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
377
 
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
378
 
            self.assertEqual(InterString, inter_repo.__class__)
379
 
            self.assertEqual(dummy_a, inter_repo.source)
380
 
            self.assertEqual(dummy_b, inter_repo.target)
381
 
        finally:
382
 
            repository.InterRepository.unregister_optimiser(InterString)
383
 
        # now we should get the default InterRepository object again.
384
 
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
385
 
 
386
 
 
387
 
class TestInterWeaveRepo(TestCaseWithTransport):
388
 
 
389
 
    def test_is_compatible_and_registered(self):
390
 
        # InterWeaveRepo is compatible when either side
391
 
        # is a format 5/6/7 branch
392
 
        formats = [repository.RepositoryFormat5(),
393
 
                   repository.RepositoryFormat6(),
394
 
                   repository.RepositoryFormat7()]
395
 
        incompatible_formats = [repository.RepositoryFormat4(),
396
 
                                repository.RepositoryFormatKnit1(),
397
 
                                ]
398
 
        repo_a = self.make_repository('a')
399
 
        repo_b = self.make_repository('b')
400
 
        is_compatible = repository.InterWeaveRepo.is_compatible
401
 
        for source in incompatible_formats:
402
 
            # force incompatible left then right
403
 
            repo_a._format = source
404
 
            repo_b._format = formats[0]
405
 
            self.assertFalse(is_compatible(repo_a, repo_b))
406
 
            self.assertFalse(is_compatible(repo_b, repo_a))
407
 
        for source in formats:
408
 
            repo_a._format = source
409
 
            for target in formats:
410
 
                repo_b._format = target
411
 
                self.assertTrue(is_compatible(repo_a, repo_b))
412
 
        self.assertEqual(repository.InterWeaveRepo,
413
 
                         repository.InterRepository.get(repo_a,
414
 
                                                        repo_b).__class__)
415
 
 
416
 
 
417
 
class TestRepositoryConverter(TestCaseWithTransport):
418
 
 
419
 
    def test_convert_empty(self):
420
 
        t = get_transport(self.get_url('.'))
421
 
        t.mkdir('repository')
422
 
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
423
 
        repo = repository.RepositoryFormat7().initialize(repo_dir)
424
 
        target_format = repository.RepositoryFormatKnit1()
425
 
        converter = repository.CopyConverter(target_format)
426
 
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
427
 
        try:
428
 
            converter.convert(repo, pb)
429
 
        finally:
430
 
            pb.finished()
431
 
        repo = repo_dir.open_repository()
432
 
        self.assertTrue(isinstance(target_format, repo._format.__class__))
433
 
 
434
 
 
435
 
class TestMisc(TestCase):
436
 
    
437
 
    def test_unescape_xml(self):
438
 
        """We get some kind of error when malformed entities are passed"""
439
 
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;') 
440
 
 
441
 
 
442
 
class TestRepositoryFormatKnit2(TestCaseWithTransport):
443
 
 
444
 
    def test_convert(self):
445
 
        """Ensure the upgrade adds weaves for roots"""
446
 
        format = bzrdir.BzrDirMetaFormat1()
447
 
        format.repository_format = repository.RepositoryFormatKnit1()
448
 
        tree = self.make_branch_and_tree('.', format)
449
 
        tree.commit("Dull commit", rev_id="dull")
450
 
        revision_tree = tree.branch.repository.revision_tree('dull')
451
 
        self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
452
 
            revision_tree.inventory.root.file_id)
453
 
        format = bzrdir.BzrDirMetaFormat1()
454
 
        format.repository_format = repository.RepositoryFormatKnit2()
455
 
        upgrade.Convert('.', format)
456
 
        tree = workingtree.WorkingTree.open('.')
457
 
        revision_tree = tree.branch.repository.revision_tree('dull')
458
 
        revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
459
 
        tree.commit("Another dull commit", rev_id='dull2')
460
 
        revision_tree = tree.branch.repository.revision_tree('dull2')
461
 
        self.assertEqual('dull', revision_tree.inventory.root.revision)