~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: John Arbash Meinel
  • Date: 2006-07-09 15:47:02 UTC
  • mto: This revision was merged to the branch mainline in revision 1846.
  • Revision ID: john@arbash-meinel.com-20060709154702-304436a47a55e265
Renaming LockHelpers.py to lock_helpers.py

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the Repository facility that are not interface tests.
 
18
 
 
19
For interface tests see tests/repository_implementations/*.py.
 
20
 
 
21
For concrete class tests see this file, and for storage formats tests
 
22
also see this file.
 
23
"""
 
24
 
 
25
from stat import S_ISDIR
 
26
from StringIO import StringIO
 
27
 
 
28
import bzrlib
 
29
import bzrlib.bzrdir as bzrdir
 
30
import bzrlib.errors as errors
 
31
from bzrlib.errors import (NotBranchError,
 
32
                           NoSuchFile,
 
33
                           UnknownFormatError,
 
34
                           UnsupportedFormatError,
 
35
                           )
 
36
import bzrlib.repository as repository
 
37
from bzrlib.tests import TestCase, TestCaseWithTransport
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.http import HttpServer
 
40
from bzrlib.transport.memory import MemoryServer
 
41
 
 
42
 
 
43
class TestDefaultFormat(TestCase):
 
44
 
 
45
    def test_get_set_default_format(self):
 
46
        old_format = repository.RepositoryFormat.get_default_format()
 
47
        self.assertTrue(isinstance(old_format, repository.RepositoryFormatKnit1))
 
48
        repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
 
49
        # creating a repository should now create an instrumented dir.
 
50
        try:
 
51
            # the default branch format is used by the meta dir format
 
52
            # which is not the default bzrdir format at this point
 
53
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
 
54
            result = dir.create_repository()
 
55
            self.assertEqual(result, 'A bzr repository dir')
 
56
        finally:
 
57
            repository.RepositoryFormat.set_default_format(old_format)
 
58
        self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
 
59
 
 
60
 
 
61
class SampleRepositoryFormat(repository.RepositoryFormat):
 
62
    """A sample format
 
63
 
 
64
    this format is initializable, unsupported to aid in testing the 
 
65
    open and open(unsupported=True) routines.
 
66
    """
 
67
 
 
68
    def get_format_string(self):
 
69
        """See RepositoryFormat.get_format_string()."""
 
70
        return "Sample .bzr repository format."
 
71
 
 
72
    def initialize(self, a_bzrdir, shared=False):
 
73
        """Initialize a repository in a BzrDir"""
 
74
        t = a_bzrdir.get_repository_transport(self)
 
75
        t.put('format', StringIO(self.get_format_string()))
 
76
        return 'A bzr repository dir'
 
77
 
 
78
    def is_supported(self):
 
79
        return False
 
80
 
 
81
    def open(self, a_bzrdir, _found=False):
 
82
        return "opened repository."
 
83
 
 
84
 
 
85
class TestRepositoryFormat(TestCaseWithTransport):
 
86
    """Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
 
87
 
 
88
    def test_find_format(self):
 
89
        # is the right format object found for a repository?
 
90
        # create a branch with a few known format objects.
 
91
        # this is not quite the same as 
 
92
        self.build_tree(["foo/", "bar/"])
 
93
        def check_format(format, url):
 
94
            dir = format._matchingbzrdir.initialize(url)
 
95
            format.initialize(dir)
 
96
            t = get_transport(url)
 
97
            found_format = repository.RepositoryFormat.find_format(dir)
 
98
            self.failUnless(isinstance(found_format, format.__class__))
 
99
        check_format(repository.RepositoryFormat7(), "bar")
 
100
        
 
101
    def test_find_format_no_repository(self):
 
102
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
103
        self.assertRaises(errors.NoRepositoryPresent,
 
104
                          repository.RepositoryFormat.find_format,
 
105
                          dir)
 
106
 
 
107
    def test_find_format_unknown_format(self):
 
108
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
109
        SampleRepositoryFormat().initialize(dir)
 
110
        self.assertRaises(UnknownFormatError,
 
111
                          repository.RepositoryFormat.find_format,
 
112
                          dir)
 
113
 
 
114
    def test_register_unregister_format(self):
 
115
        format = SampleRepositoryFormat()
 
116
        # make a control dir
 
117
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
118
        # make a repo
 
119
        format.initialize(dir)
 
120
        # register a format for it.
 
121
        repository.RepositoryFormat.register_format(format)
 
122
        # which repository.Open will refuse (not supported)
 
123
        self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
 
124
        # but open(unsupported) will work
 
125
        self.assertEqual(format.open(dir), "opened repository.")
 
126
        # unregister the format
 
127
        repository.RepositoryFormat.unregister_format(format)
 
128
 
 
129
 
 
130
class TestFormat6(TestCaseWithTransport):
 
131
 
 
132
    def test_no_ancestry_weave(self):
 
133
        control = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
134
        repo = repository.RepositoryFormat6().initialize(control)
 
135
        # We no longer need to create the ancestry.weave file
 
136
        # since it is *never* used.
 
137
        self.assertRaises(NoSuchFile,
 
138
                          control.transport.get,
 
139
                          'ancestry.weave')
 
140
 
 
141
 
 
142
class TestFormat7(TestCaseWithTransport):
 
143
    
 
144
    def test_disk_layout(self):
 
145
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
146
        repo = repository.RepositoryFormat7().initialize(control)
 
147
        # in case of side effects of locking.
 
148
        repo.lock_write()
 
149
        repo.unlock()
 
150
        # we want:
 
151
        # format 'Bazaar-NG Repository format 7'
 
152
        # lock ''
 
153
        # inventory.weave == empty_weave
 
154
        # empty revision-store directory
 
155
        # empty weaves directory
 
156
        t = control.get_repository_transport(None)
 
157
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
158
                             t.get('format').read())
 
159
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
160
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
161
        self.assertEqualDiff('# bzr weave file v5\n'
 
162
                             'w\n'
 
163
                             'W\n',
 
164
                             t.get('inventory.weave').read())
 
165
 
 
166
    def test_shared_disk_layout(self):
 
167
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
168
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
169
        # we want:
 
170
        # format 'Bazaar-NG Repository format 7'
 
171
        # inventory.weave == empty_weave
 
172
        # empty revision-store directory
 
173
        # empty weaves directory
 
174
        # a 'shared-storage' marker file.
 
175
        # lock is not present when unlocked
 
176
        t = control.get_repository_transport(None)
 
177
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
178
                             t.get('format').read())
 
179
        self.assertEqualDiff('', t.get('shared-storage').read())
 
180
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
181
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
182
        self.assertEqualDiff('# bzr weave file v5\n'
 
183
                             'w\n'
 
184
                             'W\n',
 
185
                             t.get('inventory.weave').read())
 
186
        self.assertFalse(t.has('branch-lock'))
 
187
 
 
188
    def test_creates_lockdir(self):
 
189
        """Make sure it appears to be controlled by a LockDir existence"""
 
190
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
191
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
192
        t = control.get_repository_transport(None)
 
193
        # TODO: Should check there is a 'lock' toplevel directory, 
 
194
        # regardless of contents
 
195
        self.assertFalse(t.has('lock/held/info'))
 
196
        repo.lock_write()
 
197
        try:
 
198
            self.assertTrue(t.has('lock/held/info'))
 
199
        finally:
 
200
            # unlock so we don't get a warning about failing to do so
 
201
            repo.unlock()
 
202
 
 
203
    def test_uses_lockdir(self):
 
204
        """repo format 7 actually locks on lockdir"""
 
205
        base_url = self.get_url()
 
206
        control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
 
207
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
208
        t = control.get_repository_transport(None)
 
209
        repo.lock_write()
 
210
        repo.unlock()
 
211
        del repo
 
212
        # make sure the same lock is created by opening it
 
213
        repo = repository.Repository.open(base_url)
 
214
        repo.lock_write()
 
215
        self.assertTrue(t.has('lock/held/info'))
 
216
        repo.unlock()
 
217
        self.assertFalse(t.has('lock/held/info'))
 
218
 
 
219
    def test_shared_no_tree_disk_layout(self):
 
220
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
221
        repo = repository.RepositoryFormat7().initialize(control, shared=True)
 
222
        repo.set_make_working_trees(False)
 
223
        # we want:
 
224
        # format 'Bazaar-NG Repository format 7'
 
225
        # lock ''
 
226
        # inventory.weave == empty_weave
 
227
        # empty revision-store directory
 
228
        # empty weaves directory
 
229
        # a 'shared-storage' marker file.
 
230
        t = control.get_repository_transport(None)
 
231
        self.assertEqualDiff('Bazaar-NG Repository format 7',
 
232
                             t.get('format').read())
 
233
        ## self.assertEqualDiff('', t.get('lock').read())
 
234
        self.assertEqualDiff('', t.get('shared-storage').read())
 
235
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
236
        repo.set_make_working_trees(True)
 
237
        self.assertFalse(t.has('no-working-trees'))
 
238
        self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
 
239
        self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
 
240
        self.assertEqualDiff('# bzr weave file v5\n'
 
241
                             'w\n'
 
242
                             'W\n',
 
243
                             t.get('inventory.weave').read())
 
244
 
 
245
 
 
246
class TestFormatKnit1(TestCaseWithTransport):
 
247
    
 
248
    def test_disk_layout(self):
 
249
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
250
        repo = repository.RepositoryFormatKnit1().initialize(control)
 
251
        # in case of side effects of locking.
 
252
        repo.lock_write()
 
253
        repo.unlock()
 
254
        # we want:
 
255
        # format 'Bazaar-NG Knit Repository Format 1'
 
256
        # lock: is a directory
 
257
        # inventory.weave == empty_weave
 
258
        # empty revision-store directory
 
259
        # empty weaves directory
 
260
        t = control.get_repository_transport(None)
 
261
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
262
                             t.get('format').read())
 
263
        # XXX: no locks left when unlocked at the moment
 
264
        # self.assertEqualDiff('', t.get('lock').read())
 
265
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
266
        self.check_knits(t)
 
267
 
 
268
    def assertHasKnit(self, t, knit_name):
 
269
        """Assert that knit_name exists on t."""
 
270
        self.assertEqualDiff('# bzr knit index 8\n',
 
271
                             t.get(knit_name + '.kndx').read())
 
272
        # no default content
 
273
        self.assertTrue(t.has(knit_name + '.knit'))
 
274
 
 
275
    def check_knits(self, t):
 
276
        """check knit content for a repository."""
 
277
        self.assertHasKnit(t, 'inventory')
 
278
        self.assertHasKnit(t, 'revisions')
 
279
        self.assertHasKnit(t, 'signatures')
 
280
 
 
281
    def test_shared_disk_layout(self):
 
282
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
283
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
284
        # we want:
 
285
        # format 'Bazaar-NG Knit Repository Format 1'
 
286
        # lock: is a directory
 
287
        # inventory.weave == empty_weave
 
288
        # empty revision-store directory
 
289
        # empty weaves directory
 
290
        # a 'shared-storage' marker file.
 
291
        t = control.get_repository_transport(None)
 
292
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
293
                             t.get('format').read())
 
294
        # XXX: no locks left when unlocked at the moment
 
295
        # self.assertEqualDiff('', t.get('lock').read())
 
296
        self.assertEqualDiff('', t.get('shared-storage').read())
 
297
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
298
        self.check_knits(t)
 
299
 
 
300
    def test_shared_no_tree_disk_layout(self):
 
301
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
302
        repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
 
303
        repo.set_make_working_trees(False)
 
304
        # we want:
 
305
        # format 'Bazaar-NG Knit Repository Format 1'
 
306
        # lock ''
 
307
        # inventory.weave == empty_weave
 
308
        # empty revision-store directory
 
309
        # empty weaves directory
 
310
        # a 'shared-storage' marker file.
 
311
        t = control.get_repository_transport(None)
 
312
        self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
 
313
                             t.get('format').read())
 
314
        # XXX: no locks left when unlocked at the moment
 
315
        # self.assertEqualDiff('', t.get('lock').read())
 
316
        self.assertEqualDiff('', t.get('shared-storage').read())
 
317
        self.assertEqualDiff('', t.get('no-working-trees').read())
 
318
        repo.set_make_working_trees(True)
 
319
        self.assertFalse(t.has('no-working-trees'))
 
320
        self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
 
321
        self.check_knits(t)
 
322
 
 
323
 
 
324
class InterString(repository.InterRepository):
 
325
    """An inter-repository optimised code path for strings.
 
326
 
 
327
    This is for use during testing where we use strings as repositories
 
328
    so that none of the default regsitered inter-repository classes will
 
329
    match.
 
330
    """
 
331
 
 
332
    @staticmethod
 
333
    def is_compatible(repo_source, repo_target):
 
334
        """InterString is compatible with strings-as-repos."""
 
335
        return isinstance(repo_source, str) and isinstance(repo_target, str)
 
336
 
 
337
 
 
338
class TestInterRepository(TestCaseWithTransport):
 
339
 
 
340
    def test_get_default_inter_repository(self):
 
341
        # test that the InterRepository.get(repo_a, repo_b) probes
 
342
        # for a inter_repo class where is_compatible(repo_a, repo_b) returns
 
343
        # true and returns a default inter_repo otherwise.
 
344
        # This also tests that the default registered optimised interrepository
 
345
        # classes do not barf inappropriately when a surprising repository type
 
346
        # is handed to them.
 
347
        dummy_a = "Repository 1."
 
348
        dummy_b = "Repository 2."
 
349
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
350
 
 
351
    def assertGetsDefaultInterRepository(self, repo_a, repo_b):
 
352
        """Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
 
353
        inter_repo = repository.InterRepository.get(repo_a, repo_b)
 
354
        self.assertEqual(repository.InterRepository,
 
355
                         inter_repo.__class__)
 
356
        self.assertEqual(repo_a, inter_repo.source)
 
357
        self.assertEqual(repo_b, inter_repo.target)
 
358
 
 
359
    def test_register_inter_repository_class(self):
 
360
        # test that a optimised code path provider - a
 
361
        # InterRepository subclass can be registered and unregistered
 
362
        # and that it is correctly selected when given a repository
 
363
        # pair that it returns true on for the is_compatible static method
 
364
        # check
 
365
        dummy_a = "Repository 1."
 
366
        dummy_b = "Repository 2."
 
367
        repository.InterRepository.register_optimiser(InterString)
 
368
        try:
 
369
            # we should get the default for something InterString returns False
 
370
            # to
 
371
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
372
            self.assertGetsDefaultInterRepository(dummy_a, None)
 
373
            # and we should get an InterString for a pair it 'likes'
 
374
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
375
            inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
 
376
            self.assertEqual(InterString, inter_repo.__class__)
 
377
            self.assertEqual(dummy_a, inter_repo.source)
 
378
            self.assertEqual(dummy_b, inter_repo.target)
 
379
        finally:
 
380
            repository.InterRepository.unregister_optimiser(InterString)
 
381
        # now we should get the default InterRepository object again.
 
382
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
 
383
 
 
384
 
 
385
class TestInterWeaveRepo(TestCaseWithTransport):
 
386
 
 
387
    def test_is_compatible_and_registered(self):
 
388
        # InterWeaveRepo is compatible when either side
 
389
        # is a format 5/6/7 branch
 
390
        formats = [repository.RepositoryFormat5(),
 
391
                   repository.RepositoryFormat6(),
 
392
                   repository.RepositoryFormat7()]
 
393
        incompatible_formats = [repository.RepositoryFormat4(),
 
394
                                repository.RepositoryFormatKnit1(),
 
395
                                ]
 
396
        repo_a = self.make_repository('a')
 
397
        repo_b = self.make_repository('b')
 
398
        is_compatible = repository.InterWeaveRepo.is_compatible
 
399
        for source in incompatible_formats:
 
400
            # force incompatible left then right
 
401
            repo_a._format = source
 
402
            repo_b._format = formats[0]
 
403
            self.assertFalse(is_compatible(repo_a, repo_b))
 
404
            self.assertFalse(is_compatible(repo_b, repo_a))
 
405
        for source in formats:
 
406
            repo_a._format = source
 
407
            for target in formats:
 
408
                repo_b._format = target
 
409
                self.assertTrue(is_compatible(repo_a, repo_b))
 
410
        self.assertEqual(repository.InterWeaveRepo,
 
411
                         repository.InterRepository.get(repo_a,
 
412
                                                        repo_b).__class__)
 
413
 
 
414
 
 
415
class TestRepositoryConverter(TestCaseWithTransport):
 
416
 
 
417
    def test_convert_empty(self):
 
418
        t = get_transport(self.get_url('.'))
 
419
        t.mkdir('repository')
 
420
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
 
421
        repo = repository.RepositoryFormat7().initialize(repo_dir)
 
422
        target_format = repository.RepositoryFormatKnit1()
 
423
        converter = repository.CopyConverter(target_format)
 
424
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
425
        try:
 
426
            converter.convert(repo, pb)
 
427
        finally:
 
428
            pb.finished()
 
429
        repo = repo_dir.open_repository()
 
430
        self.assertTrue(isinstance(target_format, repo._format.__class__))