~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Andrew Bennetts
  • Date: 2010-10-13 00:26:41 UTC
  • mto: This revision was merged to the branch mainline in revision 5498.
  • Revision ID: andrew.bennetts@canonical.com-20101013002641-9tlh9k89mlj1666m
Keep docs-plain working.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
    foreign,
27
27
    lockable_files,
28
28
    lockdir,
29
 
    repository,
30
29
    revision,
31
30
    tests,
32
31
    trace,
33
 
    vf_repository,
34
32
    )
35
33
 
36
 
from bzrlib.repofmt import groupcompress_repo
37
 
 
38
34
# This is the dummy foreign revision control system, used 
39
35
# mainly here in the testsuite to test the foreign VCS infrastructure.
40
36
# It is basically standard Bazaar with some minor modifications to 
96
92
        self._base = a_bzrdir.transport.base
97
93
        self._ignore_fallbacks = False
98
94
        self.bzrdir = a_bzrdir
99
 
        foreign.ForeignBranch.__init__(self,
 
95
        foreign.ForeignBranch.__init__(self, 
100
96
            DummyForeignVcsMapping(DummyForeignVcs()))
101
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
 
97
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
102
98
            *args, **kwargs)
103
99
 
104
 
    def _get_checkout_format(self):
105
 
        """Return the most suitable metadir for a checkout of this branch.
106
 
        Weaves are used if this branch's repository uses weaves.
107
 
        """
108
 
        return self.bzrdir.checkout_metadir()
109
 
 
110
 
    def import_last_revision_info_and_tags(self, source, revno, revid,
111
 
                                           lossy=False):
112
 
        interbranch = InterToDummyVcsBranch(source, self)
113
 
        result = interbranch.push(stop_revision=revid, lossy=True)
114
 
        if lossy:
115
 
            revid = result.revidmap[revid]
116
 
        return (revno, revid)
117
 
 
118
 
 
119
 
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
120
 
 
121
 
    def _generate_revision_if_needed(self):
122
 
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
123
 
        if self._lossy:
124
 
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
125
 
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
126
 
            self.random_revid = False
127
 
        elif self._new_revision_id is not None:
128
 
            self.random_revid = False
129
 
        else:
130
 
            self._new_revision_id = self._gen_revision_id()
131
 
            self.random_revid = True
132
 
 
133
 
 
134
 
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
135
 
    foreign.ForeignRepository):
136
 
    """Dummy foreign vcs repository."""
137
 
 
138
 
 
139
 
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
140
 
 
141
 
    repository_class = DummyForeignVcsRepository
142
 
    _commit_builder_class = DummyForeignCommitBuilder
143
 
 
144
 
    def get_format_string(self):
145
 
        return "Dummy Foreign Vcs Repository"
146
 
 
147
 
    def get_format_description(self):
148
 
        return "Dummy Foreign Vcs Repository"
149
 
 
150
 
 
151
 
class InterToDummyVcsBranch(branch.GenericInterBranch):
 
100
 
 
101
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
102
                            foreign.InterToForeignBranch):
152
103
 
153
104
    @staticmethod
154
105
    def is_compatible(source, target):
155
106
        return isinstance(target, DummyForeignVcsBranch)
156
107
 
157
 
    def push(self, overwrite=False, stop_revision=None, lossy=False):
158
 
        if not lossy:
159
 
            raise errors.NoRoundtrippingSupport(self.source, self.target)
 
108
    def push(self, overwrite=False, stop_revision=None):
 
109
        raise errors.NoRoundtrippingSupport(self.source, self.target)
 
110
 
 
111
    def lossy_push(self, stop_revision=None):
160
112
        result = branch.BranchPushResult()
161
113
        result.source_branch = self.source
162
114
        result.target_branch = self.target
163
115
        result.old_revno, result.old_revid = self.target.last_revision_info()
164
116
        self.source.lock_read()
165
117
        try:
166
 
            graph = self.source.repository.get_graph()
167
118
            # This just handles simple cases, but that's good enough for tests
168
119
            my_history = self.target.revision_history()
169
 
            if stop_revision is None:
170
 
                stop_revision = self.source.last_revision()
171
 
            their_history = list(graph.iter_lefthand_ancestry(stop_revision,
172
 
                (revision.NULL_REVISION,)))
173
 
            their_history.reverse()
 
120
            their_history = self.source.revision_history()
174
121
            if their_history[:min(len(my_history), len(their_history))] != my_history:
175
122
                raise errors.DivergedBranches(self.target, self.source)
176
123
            todo = their_history[len(my_history):]
182
129
                    return (tree.get_file(file_id), None)
183
130
                tree.get_file_with_stat = get_file_with_stat
184
131
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
185
 
                    (str(rev.timestamp), str(rev.timezone),
 
132
                    (str(rev.timestamp), str(rev.timezone), 
186
133
                        str(self.target.revno())))
187
134
                parent_revno, parent_revid= self.target.last_revision_info()
188
135
                if parent_revid == revision.NULL_REVISION:
226
173
        super(DummyForeignVcsBranchFormat, self).__init__()
227
174
        self._matchingbzrdir = DummyForeignVcsDirFormat()
228
175
 
229
 
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
230
 
            found_repository=None):
 
176
    def open(self, a_bzrdir, name=None, _found=False):
231
177
        if not _found:
232
178
            raise NotImplementedError
233
179
        try:
234
180
            transport = a_bzrdir.get_branch_transport(None, name=name)
235
181
            control_files = lockable_files.LockableFiles(transport, 'lock',
236
182
                                                         lockdir.LockDir)
237
 
            if found_repository is None:
238
 
                found_repository = a_bzrdir.find_repository()
239
183
            return DummyForeignVcsBranch(_format=self,
240
184
                              _control_files=control_files,
241
185
                              a_bzrdir=a_bzrdir,
242
 
                              _repository=found_repository)
 
186
                              _repository=a_bzrdir.find_repository())
243
187
        except errors.NoSuchFile:
244
188
            raise errors.NotBranchError(path=transport.base)
245
189
 
262
206
    def get_branch_format(self):
263
207
        return DummyForeignVcsBranchFormat()
264
208
 
265
 
    @property
266
 
    def repository_format(self):
267
 
        return DummyForeignVcsRepositoryFormat()
268
 
 
269
209
    def initialize_on_transport(self, transport):
270
210
        """Initialize a new bzrdir in the base directory of a Transport."""
271
211
        # Since we don't have a .bzr directory, inherit the
298
238
        self._control_files = lockable_files.LockableFiles(self.transport,
299
239
            "lock", lockable_files.TransportLock)
300
240
 
301
 
    def create_workingtree(self):
302
 
        # dirstate requires a ".bzr" entry to exist
303
 
        self.root_transport.put_bytes(".bzr", "foo")
304
 
        return super(DummyForeignVcsDir, self).create_workingtree()
305
 
 
306
241
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
307
242
        if name is not None:
308
243
            raise errors.NoColocatedBranchSupport(self)
312
247
        """Produce a metadir suitable for cloning with."""
313
248
        return bzrdir.format_registry.make_bzrdir("default")
314
249
 
315
 
    def checkout_metadir(self):
316
 
        return self.cloning_metadir()
317
 
 
318
250
    def sprout(self, url, revision_id=None, force_new_repo=False,
319
251
               recurse='down', possible_transports=None,
320
252
               accelerator_tree=None, hardlink=False, stacked=False,
328
260
 
329
261
 
330
262
def register_dummy_foreign_for_test(testcase):
 
263
    controldir.ControlDirFormat.register_format(DummyForeignVcsDirFormat)
 
264
    testcase.addCleanup(controldir.ControlDirFormat.unregister_format,
 
265
                        DummyForeignVcsDirFormat)
331
266
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
332
267
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
333
268
        DummyForeignProber)
334
 
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
335
 
    testcase.addCleanup(repository.format_registry.remove,
336
 
            DummyForeignVcsRepositoryFormat())
337
 
    branch.format_registry.register(DummyForeignVcsBranchFormat())
338
 
    testcase.addCleanup(branch.format_registry.remove,
339
 
            DummyForeignVcsBranchFormat())
340
269
    # We need to register the optimiser to make the dummy appears really
341
270
    # different from a regular bzr repository.
342
271
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
353
282
            raise errors.NotBranchError(path=transport.base)
354
283
        return DummyForeignVcsDirFormat()
355
284
 
356
 
    @classmethod
357
 
    def known_formats(cls):
358
 
        return set([DummyForeignVcsDirFormat()])
359
 
 
360
285
 
361
286
class ForeignVcsRegistryTests(tests.TestCase):
362
287
    """Tests for the ForeignVcsRegistry class."""
375
300
        reg = foreign.ForeignVcsRegistry()
376
301
        vcs = DummyForeignVcs()
377
302
        reg.register("dummy", vcs, "Dummy VCS")
378
 
        self.assertEquals((
379
 
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
380
 
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
303
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
304
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
381
305
 
382
306
 
383
307
class ForeignRevisionTests(tests.TestCase):
451
375
        source_tree = self.make_branch_and_tree("source")
452
376
        target_tree = self.make_branch_and_tree("target", 
453
377
            format=DummyForeignVcsDirFormat())
454
 
        pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
378
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
455
379
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
456
380
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
457
381
        self.assertEquals({}, pushresult.revidmap)
465
389
            format=DummyForeignVcsDirFormat())
466
390
        target_tree.branch.lock_write()
467
391
        try:
468
 
            pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
392
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
469
393
        finally:
470
394
            target_tree.branch.unlock()
471
395
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)