~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Martin Pool
  • Date: 2006-11-03 01:52:12 UTC
  • mto: This revision was merged to the branch mainline in revision 2119.
  • Revision ID: mbp@sourcefrog.net-20061103015212-1e5f881c2152d79f
Review comments

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the Repository facility that are not interface tests.
 
18
 
 
19
For interface tests see tests/repository_implementations/*.py.
 
20
 
 
21
For concrete class tests see this file, and for storage formats tests
 
22
also see this file.
 
23
"""
 
24
 
 
25
from stat import S_ISDIR
 
26
from StringIO import StringIO
 
27
 
 
28
import bzrlib
 
29
import bzrlib.bzrdir as bzrdir
 
30
import bzrlib.errors as errors
 
31
from bzrlib.errors import (NotBranchError,
 
32
                           NoSuchFile,
 
33
                           UnknownFormatError,
 
34
                           UnsupportedFormatError,
 
35
                           )
 
36
import bzrlib.repository as repository
 
37
from bzrlib.tests import TestCase, TestCaseWithTransport
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.http import HttpServer
 
40
from bzrlib.transport.memory import MemoryServer
 
41
from bzrlib import upgrade, workingtree
 
42
 
 
43
 
 
44
class TestDefaultFormat(TestCase):
 
45
 
 
46
    def test_get_set_default_format(self):
 
47
        private_default = repository._default_format.__class__
 
48
        old_format = repository.RepositoryFormat.get_default_format()
 
49
        self.assertTrue(isinstance(old_format, private_default))
 
50
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
 
51
        # creating a repository should now create an instrumented dir.
 
52
        try:
 
53
            # the default branch format is used by the meta dir format
 
54
            # which is not the default bzrdir format at this point
 
55
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
 
56
            result = dir.create_repository()
 
57
            self.assertEqual(result, 'A bzr repository dir')
 
58
        finally:
 
59
            repository.RepositoryFormat.set_default_format(old_format)
 
60
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
 
61
 
 
62
 
 
63
class SampleRepositoryFormat(repository.RepositoryFormat):
 
64
    """A sample format
 
65
 
 
66
    this format is initializable, unsupported to aid in testing the 
 
67
    open and open(unsupported=True) routines.
 
68
    """
 
69
 
 
70
    def get_format_string(self):
 
71
        """See RepositoryFormat.get_format_string()."""
 
72
        return "Sample .bzr repository format."
 
73
 
 
74
    def initialize(self, a_bzrdir, shared=False):
 
75
        """Initialize a repository in a BzrDir"""
 
76
        t = a_bzrdir.get_repository_transport(self)
 
77
        t.put_bytes('format', self.get_format_string())
 
78
        return 'A bzr repository dir'
 
79
 
 
80
    def is_supported(self):
 
81
        return False
 
82
 
 
83
    def open(self, a_bzrdir, _found=False):
 
84
        return "opened repository."
 
85
 
 
86
 
 
87
class TestRepositoryFormat(TestCaseWithTransport):
 
88
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
 
89
 
 
90
    def test_find_format(self):
 
91
        # is the right format object found for a repository?
 
92
        # create a branch with a few known format objects.
 
93
        # this is not quite the same as 
 
94
        self.build_tree(["foo/", "bar/"])
 
95
        def check_format(format, url):
 
96
            dir = format._matchingbzrdir.initialize(url)
 
97
            format.initialize(dir)
 
98
            t = get_transport(url)
 
99
            found_format = repository.RepositoryFormat.find_format(dir)
 
100
            self.failUnless(isinstance(found_format, format.__class__))
 
101
        check_format(repository.RepositoryFormat7(), "bar")
 
102
        
 
103
    def test_find_format_no_repository(self):
 
104
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
105
        self.assertRaises(errors.NoRepositoryPresent,
 
106
                          repository.RepositoryFormat.find_format,
 
107
                          dir)
 
108
 
 
109
    def test_find_format_unknown_format(self):
 
110
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
111
        SampleRepositoryFormat().initialize(dir)
 
112
        self.assertRaises(UnknownFormatError,
 
113
                          repository.RepositoryFormat.find_format,
 
114
                          dir)
 
115
 
 
116
    def test_register_unregister_format(self):
 
117
        format = SampleRepositoryFormat()
 
118
        # make a control dir
 
119
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
120
        # make a repo
 
121
        format.initialize(dir)
 
122
        # register a format for it.
 
123
        repository.RepositoryFormat.register_format(format)
 
124
        # which repository.Open will refuse (not supported)
 
125
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
 
126
        # but open(unsupported) will work
 
127
        self.assertEqual(format.open(dir), "opened repository.")
 
128
        # unregister the format
 
129
        repository.RepositoryFormat.unregister_format(format)
 
130
 
 
131
 
 
132
class TestFormat6(TestCaseWithTransport):
 
133
 
 
134
    def test_no_ancestry_weave(self):
 
135
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
136
        repo = repository.RepositoryFormat6().initialize(control)
 
137
        # We no longer need to create the ancestry.weave file
 
138
        # since it is *never* used.
 
139
        self.assertRaises(NoSuchFile,
 
140
                          control.transport.get,
 
141
                          'ancestry.weave')
 
142
 
 
143
 
 
144
class TestFormat7(TestCaseWithTransport):
 
145
    
 
146
    def test_disk_layout(self):
 
147
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
148
        repo = repository.RepositoryFormat7().initialize(control)
 
149
        # in case of side effects of locking.
 
150
        repo.lock_write()
 
151
        repo.unlock()
 
152
        # we want:
 
153
        # format 'Bazaar-NG Repository format 7'
 
154
        # lock ''
 
155
        # inventory.weave == empty_weave
 
156
        # empty revision-store directory
 
157
        # empty weaves directory
 
158
        t = control.get_repository_transport(None)
 
159
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
160
                             t.get('format').read())
 
161
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
162
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
163
        self.assertEqualDiff('# bzr weave file v5\n'
 
164
                             'w\n'
 
165
                             'W\n',
 
166
                             t.get('inventory.weave').read())
 
167
 
 
168
    def test_shared_disk_layout(self):
 
169
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
170
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
171
        # we want:
 
172
        # format 'Bazaar-NG Repository format 7'
 
173
        # inventory.weave == empty_weave
 
174
        # empty revision-store directory
 
175
        # empty weaves directory
 
176
        # a 'shared-storage' marker file.
 
177
        # lock is not present when unlocked
 
178
        t = control.get_repository_transport(None)
 
179
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
180
                             t.get('format').read())
 
181
        self.assertEqualDiff('', t.get('shared-storage').read())
 
182
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
183
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
184
        self.assertEqualDiff('# bzr weave file v5\n'
 
185
                             'w\n'
 
186
                             'W\n',
 
187
                             t.get('inventory.weave').read())
 
188
        self.assertFalse(t.has('branch-lock'))
 
189
 
 
190
    def test_creates_lockdir(self):
 
191
        """Make sure it appears to be controlled by a LockDir existence"""
 
192
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
193
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
194
        t = control.get_repository_transport(None)
 
195
        # TODO: Should check there is a 'lock' toplevel directory, 
 
196
        # regardless of contents
 
197
        self.assertFalse(t.has('lock/held/info'))
 
198
        repo.lock_write()
 
199
        try:
 
200
            self.assertTrue(t.has('lock/held/info'))
 
201
        finally:
 
202
            # unlock so we don't get a warning about failing to do so
 
203
            repo.unlock()
 
204
 
 
205
    def test_uses_lockdir(self):
 
206
        """repo format 7 actually locks on lockdir"""
 
207
        base_url = self.get_url()
 
208
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
 
209
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
210
        t = control.get_repository_transport(None)
 
211
        repo.lock_write()
 
212
        repo.unlock()
 
213
        del repo
 
214
        # make sure the same lock is created by opening it
 
215
        repo = repository.Repository.open(base_url)
 
216
        repo.lock_write()
 
217
        self.assertTrue(t.has('lock/held/info'))
 
218
        repo.unlock()
 
219
        self.assertFalse(t.has('lock/held/info'))
 
220
 
 
221
    def test_shared_no_tree_disk_layout(self):
 
222
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
223
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
224
        repo.set_make_working_trees(False)
 
225
        # we want:
 
226
        # format 'Bazaar-NG Repository format 7'
 
227
        # lock ''
 
228
        # inventory.weave == empty_weave
 
229
        # empty revision-store directory
 
230
        # empty weaves directory
 
231
        # a 'shared-storage' marker file.
 
232
        t = control.get_repository_transport(None)
 
233
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
234
                             t.get('format').read())
 
235
        ## self.assertEqualDiff('', t.get('lock').read())
 
236
        self.assertEqualDiff('', t.get('shared-storage').read())
 
237
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
238
        repo.set_make_working_trees(True)
 
239
        self.assertFalse(t.has('no-working-trees'))
 
240
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
241
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
242
        self.assertEqualDiff('# bzr weave file v5\n'
 
243
                             'w\n'
 
244
                             'W\n',
 
245
                             t.get('inventory.weave').read())
 
246
 
 
247
 
 
248
class TestFormatKnit1(TestCaseWithTransport):
 
249
    
 
250
    def test_disk_layout(self):
 
251
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
252
        repo = repository.RepositoryFormatKnit1().initialize(control)
 
253
        # in case of side effects of locking.
 
254
        repo.lock_write()
 
255
        repo.unlock()
 
256
        # we want:
 
257
        # format 'Bazaar-NG Knit Repository Format 1'
 
258
        # lock: is a directory
 
259
        # inventory.weave == empty_weave
 
260
        # empty revision-store directory
 
261
        # empty weaves directory
 
262
        t = control.get_repository_transport(None)
 
263
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
264
                             t.get('format').read())
 
265
        # XXX: no locks left when unlocked at the moment
 
266
        # self.assertEqualDiff('', t.get('lock').read())
 
267
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
268
        self.check_knits(t)
 
269
 
 
270
    def assertHasKnit(self, t, knit_name):
 
271
        """Assert that knit_name exists on t."""
 
272
        self.assertEqualDiff('# bzr knit index 8\n',
 
273
                             t.get(knit_name + '.kndx').read())
 
274
        # no default content
 
275
        self.assertTrue(t.has(knit_name + '.knit'))
 
276
 
 
277
    def check_knits(self, t):
 
278
        """check knit content for a repository."""
 
279
        self.assertHasKnit(t, 'inventory')
 
280
        self.assertHasKnit(t, 'revisions')
 
281
        self.assertHasKnit(t, 'signatures')
 
282
 
 
283
    def test_shared_disk_layout(self):
 
284
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
285
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
286
        # we want:
 
287
        # format 'Bazaar-NG Knit Repository Format 1'
 
288
        # lock: is a directory
 
289
        # inventory.weave == empty_weave
 
290
        # empty revision-store directory
 
291
        # empty weaves directory
 
292
        # a 'shared-storage' marker file.
 
293
        t = control.get_repository_transport(None)
 
294
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
295
                             t.get('format').read())
 
296
        # XXX: no locks left when unlocked at the moment
 
297
        # self.assertEqualDiff('', t.get('lock').read())
 
298
        self.assertEqualDiff('', t.get('shared-storage').read())
 
299
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
300
        self.check_knits(t)
 
301
 
 
302
    def test_shared_no_tree_disk_layout(self):
 
303
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
304
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
305
        repo.set_make_working_trees(False)
 
306
        # we want:
 
307
        # format 'Bazaar-NG Knit Repository Format 1'
 
308
        # lock ''
 
309
        # inventory.weave == empty_weave
 
310
        # empty revision-store directory
 
311
        # empty weaves directory
 
312
        # a 'shared-storage' marker file.
 
313
        t = control.get_repository_transport(None)
 
314
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
315
                             t.get('format').read())
 
316
        # XXX: no locks left when unlocked at the moment
 
317
        # self.assertEqualDiff('', t.get('lock').read())
 
318
        self.assertEqualDiff('', t.get('shared-storage').read())
 
319
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
320
        repo.set_make_working_trees(True)
 
321
        self.assertFalse(t.has('no-working-trees'))
 
322
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
323
        self.check_knits(t)
 
324
 
 
325
 
 
326
class InterString(repository.InterRepository):
 
327
    """An inter-repository optimised code path for strings.
 
328
 
 
329
    This is for use during testing where we use strings as repositories
 
330
    so that none of the default regsitered inter-repository classes will
 
331
    match.
 
332
    """
 
333
 
 
334
    @staticmethod
 
335
    def is_compatible(repo_source, repo_target):
 
336
        """InterString is compatible with strings-as-repos."""
 
337
        return isinstance(repo_source, str) and isinstance(repo_target, str)
 
338
 
 
339
 
 
340
class TestInterRepository(TestCaseWithTransport):
 
341
 
 
342
    def test_get_default_inter_repository(self):
 
343
        # test that the InterRepository.get(repo_a, repo_b) probes
 
344
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
 
345
        # true and returns a default inter_repo otherwise.
 
346
        # This also tests that the default registered optimised interrepository
 
347
        # classes do not barf inappropriately when a surprising repository type
 
348
        # is handed to them.
 
349
        dummy_a = "Repository 1."
 
350
        dummy_b = "Repository 2."
 
351
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
352
 
 
353
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
 
354
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
 
355
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
 
356
        self.assertEqual(repository.InterRepository,
 
357
                         inter_repo.__class__)
 
358
        self.assertEqual(repo_a, inter_repo.source)
 
359
        self.assertEqual(repo_b, inter_repo.target)
 
360
 
 
361
    def test_register_inter_repository_class(self):
 
362
        # test that a optimised code path provider - a
 
363
        # InterRepository subclass can be registered and unregistered
 
364
        # and that it is correctly selected when given a repository
 
365
        # pair that it returns true on for the is_compatible static method
 
366
        # check
 
367
        dummy_a = "Repository 1."
 
368
        dummy_b = "Repository 2."
 
369
        repository.InterRepository.register_optimiser(InterString)
 
370
        try:
 
371
            # we should get the default for something InterString returns False
 
372
            # to
 
373
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
374
            self.assertGetsDefaultInterRepository(dummy_a, None)
 
375
            # and we should get an InterString for a pair it 'likes'
 
376
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
377
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
 
378
            self.assertEqual(InterString, inter_repo.__class__)
 
379
            self.assertEqual(dummy_a, inter_repo.source)
 
380
            self.assertEqual(dummy_b, inter_repo.target)
 
381
        finally:
 
382
            repository.InterRepository.unregister_optimiser(InterString)
 
383
        # now we should get the default InterRepository object again.
 
384
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
385
 
 
386
 
 
387
class TestInterWeaveRepo(TestCaseWithTransport):
 
388
 
 
389
    def test_is_compatible_and_registered(self):
 
390
        # InterWeaveRepo is compatible when either side
 
391
        # is a format 5/6/7 branch
 
392
        formats = [repository.RepositoryFormat5(),
 
393
                   repository.RepositoryFormat6(),
 
394
                   repository.RepositoryFormat7()]
 
395
        incompatible_formats = [repository.RepositoryFormat4(),
 
396
                                repository.RepositoryFormatKnit1(),
 
397
                                ]
 
398
        repo_a = self.make_repository('a')
 
399
        repo_b = self.make_repository('b')
 
400
        is_compatible = repository.InterWeaveRepo.is_compatible
 
401
        for source in incompatible_formats:
 
402
            # force incompatible left then right
 
403
            repo_a._format = source
 
404
            repo_b._format = formats[0]
 
405
            self.assertFalse(is_compatible(repo_a, repo_b))
 
406
            self.assertFalse(is_compatible(repo_b, repo_a))
 
407
        for source in formats:
 
408
            repo_a._format = source
 
409
            for target in formats:
 
410
                repo_b._format = target
 
411
                self.assertTrue(is_compatible(repo_a, repo_b))
 
412
        self.assertEqual(repository.InterWeaveRepo,
 
413
                         repository.InterRepository.get(repo_a,
 
414
                                                        repo_b).__class__)
 
415
 
 
416
 
 
417
class TestRepositoryConverter(TestCaseWithTransport):
 
418
 
 
419
    def test_convert_empty(self):
 
420
        t = get_transport(self.get_url('.'))
 
421
        t.mkdir('repository')
 
422
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
 
423
        repo = repository.RepositoryFormat7().initialize(repo_dir)
 
424
        target_format = repository.RepositoryFormatKnit1()
 
425
        converter = repository.CopyConverter(target_format)
 
426
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
427
        try:
 
428
            converter.convert(repo, pb)
 
429
        finally:
 
430
            pb.finished()
 
431
        repo = repo_dir.open_repository()
 
432
        self.assertTrue(isinstance(target_format, repo._format.__class__))
 
433
 
 
434
 
 
435
class TestMisc(TestCase):
 
436
    
 
437
    def test_unescape_xml(self):
 
438
        """We get some kind of error when malformed entities are passed"""
 
439
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;') 
 
440
 
 
441
 
 
442
class TestRepositoryFormatKnit2(TestCaseWithTransport):
 
443
 
 
444
    def test_convert(self):
 
445
        """Ensure the upgrade adds weaves for roots"""
 
446
        format = bzrdir.BzrDirMetaFormat1()
 
447
        format.repository_format = repository.RepositoryFormatKnit1()
 
448
        tree = self.make_branch_and_tree('.', format)
 
449
        tree.commit("Dull commit", rev_id="dull")
 
450
        revision_tree = tree.branch.repository.revision_tree('dull')
 
451
        self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
 
452
            revision_tree.inventory.root.file_id)
 
453
        format = bzrdir.BzrDirMetaFormat1()
 
454
        format.repository_format = repository.RepositoryFormatKnit2()
 
455
        upgrade.Convert('.', format)
 
456
        tree = workingtree.WorkingTree.open('.')
 
457
        revision_tree = tree.branch.repository.revision_tree('dull')
 
458
        revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
 
459
        tree.commit("Another dull commit", rev_id='dull2')
 
460
        revision_tree = tree.branch.repository.revision_tree('dull2')
 
461
        self.assertEqual('dull', revision_tree.inventory.root.revision)