~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: John Ferlito
  • Date: 2009-09-02 04:31:45 UTC
  • mto: (4665.7.1 serve-init)
  • mto: This revision was merged to the branch mainline in revision 4913.
  • Revision ID: johnf@inodes.org-20090902043145-gxdsfw03ilcwbyn5
Add a debian init script for bzr --serve

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
 
21
21
from bzrlib import (
22
22
    branch,
23
 
    bzrdir,
24
 
    controldir,
25
23
    errors,
26
24
    foreign,
27
25
    lockable_files,
28
26
    lockdir,
29
 
    repository,
30
 
    revision,
31
 
    tests,
32
27
    trace,
33
28
    )
34
 
 
35
 
from bzrlib.repofmt import groupcompress_repo
 
29
from bzrlib.bzrdir import (
 
30
    BzrDir,
 
31
    BzrDirFormat,
 
32
    BzrDirMeta1,
 
33
    BzrDirMetaFormat1,
 
34
    format_registry,
 
35
    )
 
36
from bzrlib.inventory import Inventory
 
37
from bzrlib.revision import (
 
38
    NULL_REVISION,
 
39
    Revision,
 
40
    )
 
41
from bzrlib.tests import (
 
42
    TestCase,
 
43
    TestCaseWithTransport,
 
44
    )
36
45
 
37
46
# This is the dummy foreign revision control system, used 
38
47
# mainly here in the testsuite to test the foreign VCS infrastructure.
78
87
        self.mapping_registry = DummyForeignVcsMappingRegistry()
79
88
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
80
89
                                       "Version 1")
81
 
        self.abbreviation = "dummy"
82
90
 
83
91
    def show_foreign_revid(self, foreign_revid):
84
92
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
85
93
 
86
 
    def serialize_foreign_revid(self, foreign_revid):
87
 
        return "%s|%s|%s" % foreign_revid
88
 
 
89
94
 
90
95
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
91
96
    """A Dummy VCS Branch."""
94
99
        self._format = _format
95
100
        self._base = a_bzrdir.transport.base
96
101
        self._ignore_fallbacks = False
97
 
        self.bzrdir = a_bzrdir
98
 
        foreign.ForeignBranch.__init__(self,
 
102
        foreign.ForeignBranch.__init__(self, 
99
103
            DummyForeignVcsMapping(DummyForeignVcs()))
100
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
 
104
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
101
105
            *args, **kwargs)
102
106
 
103
 
    def _get_checkout_format(self):
104
 
        """Return the most suitable metadir for a checkout of this branch.
105
 
        Weaves are used if this branch's repository uses weaves.
106
 
        """
107
 
        return self.bzrdir.checkout_metadir()
108
 
 
109
 
    def import_last_revision_info_and_tags(self, source, revno, revid,
110
 
                                           lossy=False):
111
 
        interbranch = InterToDummyVcsBranch(source, self)
112
 
        if lossy:
113
 
            result = interbranch.lossy_push(revid)
114
 
            revid = result.revidmap[revid]
115
 
        else:
116
 
            interbranch.push(revid)
117
 
        return (revno, revid)
118
 
 
119
 
 
120
 
class DummyForeignCommitBuilder(repository.RootCommitBuilder):
121
 
 
122
 
    def _generate_revision_if_needed(self):
123
 
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
124
 
        if self._lossy:
125
 
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
126
 
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
127
 
            self.random_revid = False
128
 
        elif self._new_revision_id is not None:
129
 
            self.random_revid = False
130
 
        else:
131
 
            self._new_revision_id = self._gen_revision_id()
132
 
            self.random_revid = True
133
 
 
134
 
 
135
 
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
136
 
    foreign.ForeignRepository):
137
 
    """Dummy foreign vcs repository."""
138
 
 
139
 
 
140
 
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
141
 
 
142
 
    repository_class = DummyForeignVcsRepository
143
 
    _commit_builder_class = DummyForeignCommitBuilder
144
 
 
145
 
    def get_format_string(self):
146
 
        return "Dummy Foreign Vcs Repository"
147
 
 
148
 
    def get_format_description(self):
149
 
        return "Dummy Foreign Vcs Repository"
150
 
 
151
107
 
152
108
class InterToDummyVcsBranch(branch.GenericInterBranch,
153
109
                            foreign.InterToForeignBranch):
156
112
    def is_compatible(source, target):
157
113
        return isinstance(target, DummyForeignVcsBranch)
158
114
 
159
 
    def push(self, overwrite=False, stop_revision=None):
160
 
        raise errors.NoRoundtrippingSupport(self.source, self.target)
161
 
 
162
115
    def lossy_push(self, stop_revision=None):
163
116
        result = branch.BranchPushResult()
164
117
        result.source_branch = self.source
168
121
        try:
169
122
            # This just handles simple cases, but that's good enough for tests
170
123
            my_history = self.target.revision_history()
171
 
            if stop_revision is None:
172
 
                stop_revision = self.source.last_revision()
173
 
            their_history = list(self.source.repository.iter_reverse_revision_history(stop_revision))
174
 
            their_history.reverse()
 
124
            their_history = self.source.revision_history()
175
125
            if their_history[:min(len(my_history), len(their_history))] != my_history:
176
126
                raise errors.DivergedBranches(self.target, self.source)
177
127
            todo = their_history[len(my_history):]
186
136
                    (str(rev.timestamp), str(rev.timezone), 
187
137
                        str(self.target.revno())))
188
138
                parent_revno, parent_revid= self.target.last_revision_info()
189
 
                if parent_revid == revision.NULL_REVISION:
 
139
                if parent_revid == NULL_REVISION:
190
140
                    parent_revids = []
191
141
                else:
192
142
                    parent_revids = [parent_revid]
227
177
        super(DummyForeignVcsBranchFormat, self).__init__()
228
178
        self._matchingbzrdir = DummyForeignVcsDirFormat()
229
179
 
230
 
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
231
 
            found_repository=None):
 
180
    def open(self, a_bzrdir, _found=False):
232
181
        if not _found:
233
182
            raise NotImplementedError
234
183
        try:
235
 
            transport = a_bzrdir.get_branch_transport(None, name=name)
 
184
            transport = a_bzrdir.get_branch_transport(None)
236
185
            control_files = lockable_files.LockableFiles(transport, 'lock',
237
186
                                                         lockdir.LockDir)
238
 
            if found_repository is None:
239
 
                found_repository = a_bzrdir.find_repository()
240
187
            return DummyForeignVcsBranch(_format=self,
241
188
                              _control_files=control_files,
242
189
                              a_bzrdir=a_bzrdir,
243
 
                              _repository=found_repository)
 
190
                              _repository=a_bzrdir.find_repository())
244
191
        except errors.NoSuchFile:
245
192
            raise errors.NotBranchError(path=transport.base)
246
193
 
247
194
 
248
 
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
 
195
class DummyForeignVcsDirFormat(BzrDirMetaFormat1):
249
196
    """BzrDirFormat for the dummy foreign VCS."""
250
197
 
251
198
    @classmethod
263
210
    def get_branch_format(self):
264
211
        return DummyForeignVcsBranchFormat()
265
212
 
266
 
    @property
267
 
    def repository_format(self):
268
 
        return DummyForeignVcsRepositoryFormat()
 
213
    @classmethod
 
214
    def probe_transport(klass, transport):
 
215
        """Return the .bzrdir style format present in a directory."""
 
216
        if not transport.has('.dummy'):
 
217
            raise errors.NotBranchError(path=transport.base)
 
218
        return klass()
269
219
 
270
220
    def initialize_on_transport(self, transport):
271
221
        """Initialize a new bzrdir in the base directory of a Transport."""
289
239
        return DummyForeignVcsDir(transport, self)
290
240
 
291
241
 
292
 
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
 
242
class DummyForeignVcsDir(BzrDirMeta1):
293
243
 
294
244
    def __init__(self, _transport, _format):
295
245
        self._format = _format
299
249
        self._control_files = lockable_files.LockableFiles(self.transport,
300
250
            "lock", lockable_files.TransportLock)
301
251
 
302
 
    def create_workingtree(self):
303
 
        # dirstate requires a ".bzr" entry to exist
304
 
        self.root_transport.put_bytes(".bzr", "foo")
305
 
        return super(DummyForeignVcsDir, self).create_workingtree()
306
 
 
307
 
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
308
 
        if name is not None:
309
 
            raise errors.NoColocatedBranchSupport(self)
 
252
    def open_branch(self, ignore_fallbacks=True):
310
253
        return self._format.get_branch_format().open(self, _found=True)
311
254
 
312
255
    def cloning_metadir(self, stacked=False):
313
256
        """Produce a metadir suitable for cloning with."""
314
 
        return bzrdir.format_registry.make_bzrdir("default")
315
 
 
316
 
    def checkout_metadir(self):
317
 
        return self.cloning_metadir()
 
257
        return format_registry.make_bzrdir("default")
318
258
 
319
259
    def sprout(self, url, revision_id=None, force_new_repo=False,
320
260
               recurse='down', possible_transports=None,
328
268
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
329
269
 
330
270
 
331
 
def register_dummy_foreign_for_test(testcase):
332
 
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
333
 
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
334
 
        DummyForeignProber)
335
 
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
336
 
    testcase.addCleanup(repository.format_registry.remove,
337
 
            DummyForeignVcsRepositoryFormat())
338
 
    branch.format_registry.register(DummyForeignVcsBranchFormat())
339
 
    testcase.addCleanup(branch.format_registry.remove,
340
 
            DummyForeignVcsBranchFormat())
341
 
    # We need to register the optimiser to make the dummy appears really
342
 
    # different from a regular bzr repository.
343
 
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
344
 
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
345
 
                        InterToDummyVcsBranch)
346
 
 
347
 
 
348
 
class DummyForeignProber(controldir.Prober):
349
 
 
350
 
    @classmethod
351
 
    def probe_transport(klass, transport):
352
 
        """Return the .bzrdir style format present in a directory."""
353
 
        if not transport.has('.dummy'):
354
 
            raise errors.NotBranchError(path=transport.base)
355
 
        return DummyForeignVcsDirFormat()
356
 
 
357
 
    @classmethod
358
 
    def known_formats(cls):
359
 
        return set([DummyForeignVcsDirFormat()])
360
 
 
361
 
 
362
 
class ForeignVcsRegistryTests(tests.TestCase):
 
271
class ForeignVcsRegistryTests(TestCase):
363
272
    """Tests for the ForeignVcsRegistry class."""
364
273
 
365
274
    def test_parse_revision_id_no_dash(self):
376
285
        reg = foreign.ForeignVcsRegistry()
377
286
        vcs = DummyForeignVcs()
378
287
        reg.register("dummy", vcs, "Dummy VCS")
379
 
        self.assertEquals((
380
 
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
381
 
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
382
 
 
383
 
 
384
 
class ForeignRevisionTests(tests.TestCase):
 
288
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
289
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
290
 
 
291
 
 
292
class ForeignRevisionTests(TestCase):
385
293
    """Tests for the ForeignRevision class."""
386
294
 
387
295
    def test_create(self):
393
301
        self.assertEquals(mapp, rev.mapping)
394
302
 
395
303
 
396
 
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
 
304
class WorkingTreeFileUpdateTests(TestCaseWithTransport):
397
305
    """Tests for update_workingtree_fileids()."""
398
306
 
399
307
    def test_update_workingtree(self):
417
325
            wt.unlock()
418
326
 
419
327
 
420
 
class DummyForeignVcsTests(tests.TestCaseWithTransport):
 
328
class DummyForeignVcsTests(TestCaseWithTransport):
421
329
    """Very basic test for DummyForeignVcs."""
422
330
 
423
331
    def setUp(self):
 
332
        BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
333
        branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
334
        self.addCleanup(self.unregister)
424
335
        super(DummyForeignVcsTests, self).setUp()
425
 
        register_dummy_foreign_for_test(self)
 
336
 
 
337
    def unregister(self):
 
338
        try:
 
339
            BzrDirFormat.unregister_control_format(DummyForeignVcsDirFormat)
 
340
        except ValueError:
 
341
            pass
 
342
        branch.InterBranch.unregister_optimiser(InterToDummyVcsBranch)
426
343
 
427
344
    def test_create(self):
428
345
        """Test we can create dummies."""
429
346
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
430
 
        dir = bzrdir.BzrDir.open("d")
 
347
        dir = BzrDir.open("d")
431
348
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
432
349
        dir.open_repository()
433
350
        dir.open_branch()
436
353
    def test_sprout(self):
437
354
        """Test we can clone dummies and that the format is not preserved."""
438
355
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
439
 
        dir = bzrdir.BzrDir.open("d")
 
356
        dir = BzrDir.open("d")
440
357
        newdir = dir.sprout("e")
441
 
        self.assertNotEquals("A Dummy VCS Dir",
442
 
                             newdir._format.get_format_string())
443
 
 
444
 
    def test_push_not_supported(self):
445
 
        source_tree = self.make_branch_and_tree("source")
446
 
        target_tree = self.make_branch_and_tree("target", 
447
 
            format=DummyForeignVcsDirFormat())
448
 
        self.assertRaises(errors.NoRoundtrippingSupport, 
449
 
            source_tree.branch.push, target_tree.branch)
 
358
        self.assertNotEquals("A Dummy VCS Dir", newdir._format.get_format_string())
450
359
 
451
360
    def test_lossy_push_empty(self):
452
361
        source_tree = self.make_branch_and_tree("source")
453
362
        target_tree = self.make_branch_and_tree("target", 
454
363
            format=DummyForeignVcsDirFormat())
455
364
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
456
 
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
457
 
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
 
365
        self.assertEquals(NULL_REVISION, pushresult.old_revid)
 
366
        self.assertEquals(NULL_REVISION, pushresult.new_revid)
458
367
        self.assertEquals({}, pushresult.revidmap)
459
368
 
460
369
    def test_lossy_push_simple(self):
469
378
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
470
379
        finally:
471
380
            target_tree.branch.unlock()
472
 
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
381
        self.assertEquals(NULL_REVISION, pushresult.old_revid)
473
382
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
474
383
                           pushresult.revidmap)
475
384
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)