~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Martin Pool
  • Date: 2005-09-06 02:26:28 UTC
  • Revision ID: mbp@sourcefrog.net-20050906022628-66d65f0feb4a9e80
- implement version 5 xml storage, and tests

  This stores files identified by the version that introduced the 
  text, and the version that introduced the name.  Entry kinds are
  given by the xml tag not an explicit kind field.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# (C) 2006 Canonical Ltd
2
 
 
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
 
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
 
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for the Repository facility that are not interface tests.
18
 
 
19
 
For interface tests see tests/repository_implementations/*.py.
20
 
 
21
 
For concrete class tests see this file, and for storage formats tests
22
 
also see this file.
23
 
"""
24
 
 
25
 
from stat import *
26
 
from StringIO import StringIO
27
 
 
28
 
import bzrlib
29
 
import bzrlib.bzrdir as bzrdir
30
 
import bzrlib.errors as errors
31
 
from bzrlib.errors import (NotBranchError,
32
 
                           NoSuchFile,
33
 
                           UnknownFormatError,
34
 
                           UnsupportedFormatError,
35
 
                           )
36
 
import bzrlib.repository as repository
37
 
from bzrlib.tests import TestCase, TestCaseWithTransport
38
 
from bzrlib.transport import get_transport
39
 
from bzrlib.transport.http import HttpServer
40
 
from bzrlib.transport.memory import MemoryServer
41
 
 
42
 
 
43
 
class TestDefaultFormat(TestCase):
44
 
 
45
 
    def test_get_set_default_format(self):
46
 
        old_format = repository.RepositoryFormat.get_default_format()
47
 
        self.assertTrue(isinstance(old_format, repository.RepositoryFormatKnit1))
48
 
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
49
 
        # creating a repository should now create an instrumented dir.
50
 
        try:
51
 
            # the default branch format is used by the meta dir format
52
 
            # which is not the default bzrdir format at this point
53
 
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:/')
54
 
            result = dir.create_repository()
55
 
            self.assertEqual(result, 'A bzr repository dir')
56
 
        finally:
57
 
            repository.RepositoryFormat.set_default_format(old_format)
58
 
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
59
 
 
60
 
 
61
 
class SampleRepositoryFormat(repository.RepositoryFormat):
62
 
    """A sample format
63
 
 
64
 
    this format is initializable, unsupported to aid in testing the 
65
 
    open and open(unsupported=True) routines.
66
 
    """
67
 
 
68
 
    def get_format_string(self):
69
 
        """See RepositoryFormat.get_format_string()."""
70
 
        return "Sample .bzr repository format."
71
 
 
72
 
    def initialize(self, a_bzrdir, shared=False):
73
 
        """Initialize a repository in a BzrDir"""
74
 
        t = a_bzrdir.get_repository_transport(self)
75
 
        t.put('format', StringIO(self.get_format_string()))
76
 
        return 'A bzr repository dir'
77
 
 
78
 
    def is_supported(self):
79
 
        return False
80
 
 
81
 
    def open(self, a_bzrdir, _found=False):
82
 
        return "opened repository."
83
 
 
84
 
 
85
 
class TestRepositoryFormat(TestCaseWithTransport):
86
 
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
87
 
 
88
 
    def test_find_format(self):
89
 
        # is the right format object found for a repository?
90
 
        # create a branch with a few known format objects.
91
 
        # this is not quite the same as 
92
 
        self.build_tree(["foo/", "bar/"])
93
 
        def check_format(format, url):
94
 
            dir = format._matchingbzrdir.initialize(url)
95
 
            format.initialize(dir)
96
 
            t = get_transport(url)
97
 
            found_format = repository.RepositoryFormat.find_format(dir)
98
 
            self.failUnless(isinstance(found_format, format.__class__))
99
 
        check_format(repository.RepositoryFormat7(), "bar")
100
 
        
101
 
    def test_find_format_no_repository(self):
102
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
103
 
        self.assertRaises(errors.NoRepositoryPresent,
104
 
                          repository.RepositoryFormat.find_format,
105
 
                          dir)
106
 
 
107
 
    def test_find_format_unknown_format(self):
108
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
109
 
        SampleRepositoryFormat().initialize(dir)
110
 
        self.assertRaises(UnknownFormatError,
111
 
                          repository.RepositoryFormat.find_format,
112
 
                          dir)
113
 
 
114
 
    def test_register_unregister_format(self):
115
 
        format = SampleRepositoryFormat()
116
 
        # make a control dir
117
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
118
 
        # make a repo
119
 
        format.initialize(dir)
120
 
        # register a format for it.
121
 
        repository.RepositoryFormat.register_format(format)
122
 
        # which repository.Open will refuse (not supported)
123
 
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
124
 
        # but open(unsupported) will work
125
 
        self.assertEqual(format.open(dir), "opened repository.")
126
 
        # unregister the format
127
 
        repository.RepositoryFormat.unregister_format(format)
128
 
 
129
 
 
130
 
class TestFormat6(TestCaseWithTransport):
131
 
 
132
 
    def test_no_ancestry_weave(self):
133
 
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
134
 
        repo = repository.RepositoryFormat6().initialize(control)
135
 
        # We no longer need to create the ancestry.weave file
136
 
        # since it is *never* used.
137
 
        self.assertRaises(NoSuchFile,
138
 
                          control.transport.get,
139
 
                          'ancestry.weave')
140
 
 
141
 
 
142
 
class TestFormat7(TestCaseWithTransport):
143
 
    
144
 
    def test_disk_layout(self):
145
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
146
 
        repo = repository.RepositoryFormat7().initialize(control)
147
 
        # in case of side effects of locking.
148
 
        repo.lock_write()
149
 
        repo.unlock()
150
 
        # we want:
151
 
        # format 'Bazaar-NG Repository format 7'
152
 
        # lock ''
153
 
        # inventory.weave == empty_weave
154
 
        # empty revision-store directory
155
 
        # empty weaves directory
156
 
        t = control.get_repository_transport(None)
157
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
158
 
                             t.get('format').read())
159
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
160
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
161
 
        self.assertEqualDiff('# bzr weave file v5\n'
162
 
                             'w\n'
163
 
                             'W\n',
164
 
                             t.get('inventory.weave').read())
165
 
 
166
 
    def test_shared_disk_layout(self):
167
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
168
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
169
 
        # we want:
170
 
        # format 'Bazaar-NG Repository format 7'
171
 
        # inventory.weave == empty_weave
172
 
        # empty revision-store directory
173
 
        # empty weaves directory
174
 
        # a 'shared-storage' marker file.
175
 
        # lock is not present when unlocked
176
 
        t = control.get_repository_transport(None)
177
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
178
 
                             t.get('format').read())
179
 
        self.assertEqualDiff('', t.get('shared-storage').read())
180
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
181
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
182
 
        self.assertEqualDiff('# bzr weave file v5\n'
183
 
                             'w\n'
184
 
                             'W\n',
185
 
                             t.get('inventory.weave').read())
186
 
        self.assertFalse(t.has('branch-lock'))
187
 
 
188
 
    def test_creates_lockdir(self):
189
 
        """Make sure it appears to be controlled by a LockDir existence"""
190
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
191
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
192
 
        t = control.get_repository_transport(None)
193
 
        # TODO: Should check there is a 'lock' toplevel directory, 
194
 
        # regardless of contents
195
 
        self.assertFalse(t.has('lock/held/info'))
196
 
        repo.lock_write()
197
 
        try:
198
 
            self.assertTrue(t.has('lock/held/info'))
199
 
        finally:
200
 
            # unlock so we don't get a warning about failing to do so
201
 
            repo.unlock()
202
 
 
203
 
    def test_uses_lockdir(self):
204
 
        """repo format 7 actually locks on lockdir"""
205
 
        base_url = self.get_url()
206
 
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
207
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
208
 
        t = control.get_repository_transport(None)
209
 
        repo.lock_write()
210
 
        repo.unlock()
211
 
        del repo
212
 
        # make sure the same lock is created by opening it
213
 
        repo = repository.Repository.open(base_url)
214
 
        repo.lock_write()
215
 
        self.assertTrue(t.has('lock/held/info'))
216
 
        repo.unlock()
217
 
        self.assertFalse(t.has('lock/held/info'))
218
 
 
219
 
    def test_shared_no_tree_disk_layout(self):
220
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
221
 
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
222
 
        repo.set_make_working_trees(False)
223
 
        # we want:
224
 
        # format 'Bazaar-NG Repository format 7'
225
 
        # lock ''
226
 
        # inventory.weave == empty_weave
227
 
        # empty revision-store directory
228
 
        # empty weaves directory
229
 
        # a 'shared-storage' marker file.
230
 
        t = control.get_repository_transport(None)
231
 
        self.assertEqualDiff('Bazaar-NG Repository format 7',
232
 
                             t.get('format').read())
233
 
        ## self.assertEqualDiff('', t.get('lock').read())
234
 
        self.assertEqualDiff('', t.get('shared-storage').read())
235
 
        self.assertEqualDiff('', t.get('no-working-trees').read())
236
 
        repo.set_make_working_trees(True)
237
 
        self.assertFalse(t.has('no-working-trees'))
238
 
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
239
 
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
240
 
        self.assertEqualDiff('# bzr weave file v5\n'
241
 
                             'w\n'
242
 
                             'W\n',
243
 
                             t.get('inventory.weave').read())
244
 
 
245
 
 
246
 
class TestFormatKnit1(TestCaseWithTransport):
247
 
    
248
 
    def test_disk_layout(self):
249
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
250
 
        repo = repository.RepositoryFormatKnit1().initialize(control)
251
 
        # in case of side effects of locking.
252
 
        repo.lock_write()
253
 
        repo.unlock()
254
 
        # we want:
255
 
        # format 'Bazaar-NG Knit Repository Format 1'
256
 
        # lock: is a directory
257
 
        # inventory.weave == empty_weave
258
 
        # empty revision-store directory
259
 
        # empty weaves directory
260
 
        t = control.get_repository_transport(None)
261
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
262
 
                             t.get('format').read())
263
 
        # XXX: no locks left when unlocked at the moment
264
 
        # self.assertEqualDiff('', t.get('lock').read())
265
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
266
 
        self.check_knits(t)
267
 
 
268
 
    def assertHasKnit(self, t, knit_name):
269
 
        """Assert that knit_name exists on t."""
270
 
        self.assertEqualDiff('# bzr knit index 8\n',
271
 
                             t.get(knit_name + '.kndx').read())
272
 
        # no default content
273
 
        self.assertTrue(t.has(knit_name + '.knit'))
274
 
 
275
 
    def check_knits(self, t):
276
 
        """check knit content for a repository."""
277
 
        self.assertHasKnit(t, 'inventory')
278
 
        self.assertHasKnit(t, 'revisions')
279
 
        self.assertHasKnit(t, 'signatures')
280
 
 
281
 
    def test_shared_disk_layout(self):
282
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
283
 
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
284
 
        # we want:
285
 
        # format 'Bazaar-NG Knit Repository Format 1'
286
 
        # lock: is a directory
287
 
        # inventory.weave == empty_weave
288
 
        # empty revision-store directory
289
 
        # empty weaves directory
290
 
        # a 'shared-storage' marker file.
291
 
        t = control.get_repository_transport(None)
292
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
293
 
                             t.get('format').read())
294
 
        # XXX: no locks left when unlocked at the moment
295
 
        # self.assertEqualDiff('', t.get('lock').read())
296
 
        self.assertEqualDiff('', t.get('shared-storage').read())
297
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
298
 
        self.check_knits(t)
299
 
 
300
 
    def test_shared_no_tree_disk_layout(self):
301
 
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
302
 
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
303
 
        repo.set_make_working_trees(False)
304
 
        # we want:
305
 
        # format 'Bazaar-NG Knit Repository Format 1'
306
 
        # lock ''
307
 
        # inventory.weave == empty_weave
308
 
        # empty revision-store directory
309
 
        # empty weaves directory
310
 
        # a 'shared-storage' marker file.
311
 
        t = control.get_repository_transport(None)
312
 
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
313
 
                             t.get('format').read())
314
 
        # XXX: no locks left when unlocked at the moment
315
 
        # self.assertEqualDiff('', t.get('lock').read())
316
 
        self.assertEqualDiff('', t.get('shared-storage').read())
317
 
        self.assertEqualDiff('', t.get('no-working-trees').read())
318
 
        repo.set_make_working_trees(True)
319
 
        self.assertFalse(t.has('no-working-trees'))
320
 
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
321
 
        self.check_knits(t)
322
 
 
323
 
 
324
 
class InterString(repository.InterRepository):
325
 
    """An inter-repository optimised code path for strings.
326
 
 
327
 
    This is for use during testing where we use strings as repositories
328
 
    so that none of the default regsitered inter-repository classes will
329
 
    match.
330
 
    """
331
 
 
332
 
    @staticmethod
333
 
    def is_compatible(repo_source, repo_target):
334
 
        """InterString is compatible with strings-as-repos."""
335
 
        return isinstance(repo_source, str) and isinstance(repo_target, str)
336
 
 
337
 
 
338
 
class TestInterRepository(TestCaseWithTransport):
339
 
 
340
 
    def test_get_default_inter_repository(self):
341
 
        # test that the InterRepository.get(repo_a, repo_b) probes
342
 
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
343
 
        # true and returns a default inter_repo otherwise.
344
 
        # This also tests that the default registered optimised interrepository
345
 
        # classes do not barf inappropriately when a surprising repository type
346
 
        # is handed to them.
347
 
        dummy_a = "Repository 1."
348
 
        dummy_b = "Repository 2."
349
 
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
350
 
 
351
 
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
352
 
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
353
 
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
354
 
        self.assertEqual(repository.InterRepository,
355
 
                         inter_repo.__class__)
356
 
        self.assertEqual(repo_a, inter_repo.source)
357
 
        self.assertEqual(repo_b, inter_repo.target)
358
 
 
359
 
    def test_register_inter_repository_class(self):
360
 
        # test that a optimised code path provider - a
361
 
        # InterRepository subclass can be registered and unregistered
362
 
        # and that it is correctly selected when given a repository
363
 
        # pair that it returns true on for the is_compatible static method
364
 
        # check
365
 
        dummy_a = "Repository 1."
366
 
        dummy_b = "Repository 2."
367
 
        repository.InterRepository.register_optimiser(InterString)
368
 
        try:
369
 
            # we should get the default for something InterString returns False
370
 
            # to
371
 
            self.assertFalse(InterString.is_compatible(dummy_a, None))
372
 
            self.assertGetsDefaultInterRepository(dummy_a, None)
373
 
            # and we should get an InterString for a pair it 'likes'
374
 
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
375
 
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
376
 
            self.assertEqual(InterString, inter_repo.__class__)
377
 
            self.assertEqual(dummy_a, inter_repo.source)
378
 
            self.assertEqual(dummy_b, inter_repo.target)
379
 
        finally:
380
 
            repository.InterRepository.unregister_optimiser(InterString)
381
 
        # now we should get the default InterRepository object again.
382
 
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
383
 
 
384
 
 
385
 
class TestInterWeaveRepo(TestCaseWithTransport):
386
 
 
387
 
    def test_is_compatible_and_registered(self):
388
 
        # InterWeaveRepo is compatible when either side
389
 
        # is a format 5/6/7 branch
390
 
        formats = [repository.RepositoryFormat5(),
391
 
                   repository.RepositoryFormat6(),
392
 
                   repository.RepositoryFormat7()]
393
 
        incompatible_formats = [repository.RepositoryFormat4(),
394
 
                                repository.RepositoryFormatKnit1(),
395
 
                                ]
396
 
        repo_a = self.make_repository('a')
397
 
        repo_b = self.make_repository('b')
398
 
        is_compatible = repository.InterWeaveRepo.is_compatible
399
 
        for source in incompatible_formats:
400
 
            # force incompatible left then right
401
 
            repo_a._format = source
402
 
            repo_b._format = formats[0]
403
 
            self.assertFalse(is_compatible(repo_a, repo_b))
404
 
            self.assertFalse(is_compatible(repo_b, repo_a))
405
 
        for source in formats:
406
 
            repo_a._format = source
407
 
            for target in formats:
408
 
                repo_b._format = target
409
 
                self.assertTrue(is_compatible(repo_a, repo_b))
410
 
        self.assertEqual(repository.InterWeaveRepo,
411
 
                         repository.InterRepository.get(repo_a,
412
 
                                                        repo_b).__class__)
413
 
 
414
 
 
415
 
class TestRepositoryConverter(TestCaseWithTransport):
416
 
 
417
 
    def test_convert_empty(self):
418
 
        t = get_transport(self.get_url('.'))
419
 
        t.mkdir('repository')
420
 
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
421
 
        repo = repository.RepositoryFormat7().initialize(repo_dir)
422
 
        target_format = repository.RepositoryFormatKnit1()
423
 
        converter = repository.CopyConverter(target_format)
424
 
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
425
 
        try:
426
 
            converter.convert(repo, pb)
427
 
        finally:
428
 
            pb.finished()
429
 
        repo = repo_dir.open_repository()
430
 
        self.assertTrue(isinstance(target_format, repo._format.__class__))