~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Robert Collins
  • Date: 2009-12-16 22:29:31 UTC
  • mto: This revision was merged to the branch mainline in revision 4920.
  • Revision ID: robertc@robertcollins.net-20091216222931-wbbn5ey4mwmpatwd
Review feedback.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
from bzrlib import (
22
22
    branch,
23
23
    bzrdir,
24
 
    controldir,
25
24
    errors,
26
25
    foreign,
27
26
    lockable_files,
28
27
    lockdir,
29
 
    repository,
30
28
    revision,
31
29
    tests,
32
30
    trace,
33
 
    vf_repository,
34
31
    )
35
32
 
36
 
from bzrlib.repofmt import groupcompress_repo
37
 
 
38
33
# This is the dummy foreign revision control system, used 
39
34
# mainly here in the testsuite to test the foreign VCS infrastructure.
40
35
# It is basically standard Bazaar with some minor modifications to 
95
90
        self._format = _format
96
91
        self._base = a_bzrdir.transport.base
97
92
        self._ignore_fallbacks = False
98
 
        self.bzrdir = a_bzrdir
99
 
        foreign.ForeignBranch.__init__(self,
 
93
        foreign.ForeignBranch.__init__(self, 
100
94
            DummyForeignVcsMapping(DummyForeignVcs()))
101
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
 
95
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
102
96
            *args, **kwargs)
103
97
 
104
 
    def _get_checkout_format(self):
105
 
        """Return the most suitable metadir for a checkout of this branch.
106
 
        Weaves are used if this branch's repository uses weaves.
107
 
        """
108
 
        return self.bzrdir.checkout_metadir()
109
 
 
110
 
    def import_last_revision_info_and_tags(self, source, revno, revid,
111
 
                                           lossy=False):
112
 
        interbranch = InterToDummyVcsBranch(source, self)
113
 
        result = interbranch.push(stop_revision=revid, lossy=True)
114
 
        if lossy:
115
 
            revid = result.revidmap[revid]
116
 
        return (revno, revid)
117
 
 
118
 
 
119
 
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
120
 
 
121
 
    def _generate_revision_if_needed(self):
122
 
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
123
 
        if self._lossy:
124
 
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
125
 
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
126
 
            self.random_revid = False
127
 
        elif self._new_revision_id is not None:
128
 
            self.random_revid = False
129
 
        else:
130
 
            self._new_revision_id = self._gen_revision_id()
131
 
            self.random_revid = True
132
 
 
133
 
 
134
 
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
135
 
    foreign.ForeignRepository):
136
 
    """Dummy foreign vcs repository."""
137
 
 
138
 
 
139
 
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
140
 
 
141
 
    repository_class = DummyForeignVcsRepository
142
 
    _commit_builder_class = DummyForeignCommitBuilder
143
 
 
144
 
    def get_format_string(self):
145
 
        return "Dummy Foreign Vcs Repository"
146
 
 
147
 
    def get_format_description(self):
148
 
        return "Dummy Foreign Vcs Repository"
149
 
 
150
 
 
151
 
class InterToDummyVcsBranch(branch.GenericInterBranch):
 
98
 
 
99
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
100
                            foreign.InterToForeignBranch):
152
101
 
153
102
    @staticmethod
154
103
    def is_compatible(source, target):
155
104
        return isinstance(target, DummyForeignVcsBranch)
156
105
 
157
 
    def push(self, overwrite=False, stop_revision=None, lossy=False):
158
 
        if not lossy:
159
 
            raise errors.NoRoundtrippingSupport(self.source, self.target)
 
106
    def lossy_push(self, stop_revision=None):
160
107
        result = branch.BranchPushResult()
161
108
        result.source_branch = self.source
162
109
        result.target_branch = self.target
163
110
        result.old_revno, result.old_revid = self.target.last_revision_info()
164
111
        self.source.lock_read()
165
112
        try:
166
 
            graph = self.source.repository.get_graph()
167
113
            # This just handles simple cases, but that's good enough for tests
168
114
            my_history = self.target.revision_history()
169
 
            if stop_revision is None:
170
 
                stop_revision = self.source.last_revision()
171
 
            their_history = list(graph.iter_lefthand_ancestry(stop_revision,
172
 
                (revision.NULL_REVISION,)))
173
 
            their_history.reverse()
 
115
            their_history = self.source.revision_history()
174
116
            if their_history[:min(len(my_history), len(their_history))] != my_history:
175
117
                raise errors.DivergedBranches(self.target, self.source)
176
118
            todo = their_history[len(my_history):]
182
124
                    return (tree.get_file(file_id), None)
183
125
                tree.get_file_with_stat = get_file_with_stat
184
126
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
185
 
                    (str(rev.timestamp), str(rev.timezone),
 
127
                    (str(rev.timestamp), str(rev.timezone), 
186
128
                        str(self.target.revno())))
187
129
                parent_revno, parent_revid= self.target.last_revision_info()
188
130
                if parent_revid == revision.NULL_REVISION:
226
168
        super(DummyForeignVcsBranchFormat, self).__init__()
227
169
        self._matchingbzrdir = DummyForeignVcsDirFormat()
228
170
 
229
 
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
230
 
            found_repository=None):
 
171
    def open(self, a_bzrdir, _found=False):
231
172
        if not _found:
232
173
            raise NotImplementedError
233
174
        try:
234
 
            transport = a_bzrdir.get_branch_transport(None, name=name)
 
175
            transport = a_bzrdir.get_branch_transport(None)
235
176
            control_files = lockable_files.LockableFiles(transport, 'lock',
236
177
                                                         lockdir.LockDir)
237
 
            if found_repository is None:
238
 
                found_repository = a_bzrdir.find_repository()
239
178
            return DummyForeignVcsBranch(_format=self,
240
179
                              _control_files=control_files,
241
180
                              a_bzrdir=a_bzrdir,
242
 
                              _repository=found_repository)
 
181
                              _repository=a_bzrdir.find_repository())
243
182
        except errors.NoSuchFile:
244
183
            raise errors.NotBranchError(path=transport.base)
245
184
 
262
201
    def get_branch_format(self):
263
202
        return DummyForeignVcsBranchFormat()
264
203
 
265
 
    @property
266
 
    def repository_format(self):
267
 
        return DummyForeignVcsRepositoryFormat()
 
204
    @classmethod
 
205
    def probe_transport(klass, transport):
 
206
        """Return the .bzrdir style format present in a directory."""
 
207
        if not transport.has('.dummy'):
 
208
            raise errors.NotBranchError(path=transport.base)
 
209
        return klass()
268
210
 
269
211
    def initialize_on_transport(self, transport):
270
212
        """Initialize a new bzrdir in the base directory of a Transport."""
298
240
        self._control_files = lockable_files.LockableFiles(self.transport,
299
241
            "lock", lockable_files.TransportLock)
300
242
 
301
 
    def create_workingtree(self):
302
 
        # dirstate requires a ".bzr" entry to exist
303
 
        self.root_transport.put_bytes(".bzr", "foo")
304
 
        return super(DummyForeignVcsDir, self).create_workingtree()
305
 
 
306
 
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
307
 
        if name is not None:
308
 
            raise errors.NoColocatedBranchSupport(self)
 
243
    def open_branch(self, ignore_fallbacks=True):
309
244
        return self._format.get_branch_format().open(self, _found=True)
310
245
 
311
246
    def cloning_metadir(self, stacked=False):
312
247
        """Produce a metadir suitable for cloning with."""
313
248
        return bzrdir.format_registry.make_bzrdir("default")
314
249
 
315
 
    def checkout_metadir(self):
316
 
        return self.cloning_metadir()
317
 
 
318
250
    def sprout(self, url, revision_id=None, force_new_repo=False,
319
251
               recurse='down', possible_transports=None,
320
252
               accelerator_tree=None, hardlink=False, stacked=False,
328
260
 
329
261
 
330
262
def register_dummy_foreign_for_test(testcase):
331
 
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
332
 
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
333
 
        DummyForeignProber)
334
 
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
335
 
    testcase.addCleanup(repository.format_registry.remove,
336
 
            DummyForeignVcsRepositoryFormat())
337
 
    branch.format_registry.register(DummyForeignVcsBranchFormat())
338
 
    testcase.addCleanup(branch.format_registry.remove,
339
 
            DummyForeignVcsBranchFormat())
 
263
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
264
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
 
265
                        DummyForeignVcsDirFormat)
340
266
    # We need to register the optimiser to make the dummy appears really
341
267
    # different from a regular bzr repository.
342
268
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
344
270
                        InterToDummyVcsBranch)
345
271
 
346
272
 
347
 
class DummyForeignProber(controldir.Prober):
348
 
 
349
 
    @classmethod
350
 
    def probe_transport(klass, transport):
351
 
        """Return the .bzrdir style format present in a directory."""
352
 
        if not transport.has('.dummy'):
353
 
            raise errors.NotBranchError(path=transport.base)
354
 
        return DummyForeignVcsDirFormat()
355
 
 
356
 
    @classmethod
357
 
    def known_formats(cls):
358
 
        return set([DummyForeignVcsDirFormat()])
359
 
 
360
 
 
361
273
class ForeignVcsRegistryTests(tests.TestCase):
362
274
    """Tests for the ForeignVcsRegistry class."""
363
275
 
375
287
        reg = foreign.ForeignVcsRegistry()
376
288
        vcs = DummyForeignVcs()
377
289
        reg.register("dummy", vcs, "Dummy VCS")
378
 
        self.assertEquals((
379
 
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
380
 
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
290
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
291
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
381
292
 
382
293
 
383
294
class ForeignRevisionTests(tests.TestCase):
440
351
        self.assertNotEquals("A Dummy VCS Dir",
441
352
                             newdir._format.get_format_string())
442
353
 
443
 
    def test_push_not_supported(self):
444
 
        source_tree = self.make_branch_and_tree("source")
445
 
        target_tree = self.make_branch_and_tree("target", 
446
 
            format=DummyForeignVcsDirFormat())
447
 
        self.assertRaises(errors.NoRoundtrippingSupport, 
448
 
            source_tree.branch.push, target_tree.branch)
449
 
 
450
354
    def test_lossy_push_empty(self):
451
355
        source_tree = self.make_branch_and_tree("source")
452
356
        target_tree = self.make_branch_and_tree("target", 
453
357
            format=DummyForeignVcsDirFormat())
454
 
        pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
358
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
455
359
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
456
360
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
457
361
        self.assertEquals({}, pushresult.revidmap)
465
369
            format=DummyForeignVcsDirFormat())
466
370
        target_tree.branch.lock_write()
467
371
        try:
468
 
            pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
372
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
469
373
        finally:
470
374
            target_tree.branch.unlock()
471
375
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)