~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-09 19:25:42 UTC
  • mto: (5777.5.1 inventoryworkingtree)
  • mto: This revision was merged to the branch mainline in revision 5781.
  • Revision ID: jelmer@samba.org-20110409192542-8bbedp36s7nj928e
Split InventoryTree out of Tree.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    foreign,
27
27
    lockable_files,
28
28
    lockdir,
29
 
    repository,
30
29
    revision,
31
30
    tests,
32
31
    trace,
33
 
    vf_repository,
34
32
    )
35
33
 
36
 
from bzrlib.repofmt import groupcompress_repo
37
 
 
38
34
# This is the dummy foreign revision control system, used 
39
35
# mainly here in the testsuite to test the foreign VCS infrastructure.
40
36
# It is basically standard Bazaar with some minor modifications to 
96
92
        self._base = a_bzrdir.transport.base
97
93
        self._ignore_fallbacks = False
98
94
        self.bzrdir = a_bzrdir
99
 
        foreign.ForeignBranch.__init__(self,
 
95
        foreign.ForeignBranch.__init__(self, 
100
96
            DummyForeignVcsMapping(DummyForeignVcs()))
101
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
 
97
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
102
98
            *args, **kwargs)
103
99
 
104
 
    def _get_checkout_format(self, lightweight=False):
105
 
        """Return the most suitable metadir for a checkout of this branch.
106
 
        Weaves are used if this branch's repository uses weaves.
107
 
        """
108
 
        return self.bzrdir.checkout_metadir()
109
 
 
110
 
    def import_last_revision_info_and_tags(self, source, revno, revid,
111
 
                                           lossy=False):
112
 
        interbranch = InterToDummyVcsBranch(source, self)
113
 
        result = interbranch.push(stop_revision=revid, lossy=True)
114
 
        if lossy:
115
 
            revid = result.revidmap[revid]
116
 
        return (revno, revid)
117
 
 
118
 
 
119
 
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
120
 
 
121
 
    def _generate_revision_if_needed(self):
122
 
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
123
 
        if self._lossy:
124
 
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
125
 
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
126
 
            self.random_revid = False
127
 
        elif self._new_revision_id is not None:
128
 
            self.random_revid = False
129
 
        else:
130
 
            self._new_revision_id = self._gen_revision_id()
131
 
            self.random_revid = True
132
 
 
133
 
 
134
 
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
135
 
    foreign.ForeignRepository):
136
 
    """Dummy foreign vcs repository."""
137
 
 
138
 
 
139
 
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
140
 
 
141
 
    repository_class = DummyForeignVcsRepository
142
 
    _commit_builder_class = DummyForeignCommitBuilder
143
 
 
144
 
    def get_format_string(self):
145
 
        return "Dummy Foreign Vcs Repository"
146
 
 
147
 
    def get_format_description(self):
148
 
        return "Dummy Foreign Vcs Repository"
149
 
 
150
 
 
151
 
def branch_history(graph, revid):
152
 
    ret = list(graph.iter_lefthand_ancestry(revid,
153
 
        (revision.NULL_REVISION,)))
154
 
    ret.reverse()
155
 
    return ret
156
 
 
157
 
 
158
 
class InterToDummyVcsBranch(branch.GenericInterBranch):
 
100
 
 
101
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
102
                            foreign.InterToForeignBranch):
159
103
 
160
104
    @staticmethod
161
105
    def is_compatible(source, target):
162
106
        return isinstance(target, DummyForeignVcsBranch)
163
107
 
164
 
    def push(self, overwrite=False, stop_revision=None, lossy=False):
165
 
        if not lossy:
166
 
            raise errors.NoRoundtrippingSupport(self.source, self.target)
 
108
    def push(self, overwrite=False, stop_revision=None):
 
109
        raise errors.NoRoundtrippingSupport(self.source, self.target)
 
110
 
 
111
    def lossy_push(self, stop_revision=None):
167
112
        result = branch.BranchPushResult()
168
113
        result.source_branch = self.source
169
114
        result.target_branch = self.target
170
115
        result.old_revno, result.old_revid = self.target.last_revision_info()
171
116
        self.source.lock_read()
172
117
        try:
173
 
            graph = self.source.repository.get_graph()
174
118
            # This just handles simple cases, but that's good enough for tests
175
 
            my_history = branch_history(self.target.repository.get_graph(),
176
 
                result.old_revid)
177
 
            if stop_revision is None:
178
 
                stop_revision = self.source.last_revision()
179
 
            their_history = branch_history(graph, stop_revision)
 
119
            my_history = self.target.revision_history()
 
120
            their_history = self.source.revision_history()
180
121
            if their_history[:min(len(my_history), len(their_history))] != my_history:
181
122
                raise errors.DivergedBranches(self.target, self.source)
182
123
            todo = their_history[len(my_history):]
188
129
                    return (tree.get_file(file_id), None)
189
130
                tree.get_file_with_stat = get_file_with_stat
190
131
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
191
 
                    (str(rev.timestamp), str(rev.timezone),
 
132
                    (str(rev.timestamp), str(rev.timezone), 
192
133
                        str(self.target.revno())))
193
134
                parent_revno, parent_revid= self.target.last_revision_info()
194
135
                if parent_revid == revision.NULL_REVISION:
228
169
    def get_format_string(self):
229
170
        return "Branch for Testing"
230
171
 
231
 
    @property
232
 
    def _matchingbzrdir(self):
233
 
        return DummyForeignVcsDirFormat()
 
172
    def __init__(self):
 
173
        super(DummyForeignVcsBranchFormat, self).__init__()
 
174
        self._matchingbzrdir = DummyForeignVcsDirFormat()
234
175
 
235
 
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
236
 
            found_repository=None):
 
176
    def open(self, a_bzrdir, name=None, _found=False, found_repository=None):
237
177
        if not _found:
238
178
            raise NotImplementedError
239
179
        try:
268
208
    def get_branch_format(self):
269
209
        return DummyForeignVcsBranchFormat()
270
210
 
271
 
    @property
272
 
    def repository_format(self):
273
 
        return DummyForeignVcsRepositoryFormat()
274
 
 
275
211
    def initialize_on_transport(self, transport):
276
212
        """Initialize a new bzrdir in the base directory of a Transport."""
277
213
        # Since we don't have a .bzr directory, inherit the
304
240
        self._control_files = lockable_files.LockableFiles(self.transport,
305
241
            "lock", lockable_files.TransportLock)
306
242
 
307
 
    def create_workingtree(self):
308
 
        # dirstate requires a ".bzr" entry to exist
309
 
        self.root_transport.put_bytes(".bzr", "foo")
310
 
        return super(DummyForeignVcsDir, self).create_workingtree()
311
 
 
312
243
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
313
244
        if name is not None:
314
245
            raise errors.NoColocatedBranchSupport(self)
318
249
        """Produce a metadir suitable for cloning with."""
319
250
        return bzrdir.format_registry.make_bzrdir("default")
320
251
 
321
 
    def checkout_metadir(self):
322
 
        return self.cloning_metadir()
323
 
 
324
252
    def sprout(self, url, revision_id=None, force_new_repo=False,
325
253
               recurse='down', possible_transports=None,
326
254
               accelerator_tree=None, hardlink=False, stacked=False,
337
265
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
338
266
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
339
267
        DummyForeignProber)
340
 
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
341
 
    testcase.addCleanup(repository.format_registry.remove,
342
 
            DummyForeignVcsRepositoryFormat())
343
 
    branch.format_registry.register(DummyForeignVcsBranchFormat())
344
 
    testcase.addCleanup(branch.format_registry.remove,
345
 
            DummyForeignVcsBranchFormat())
346
268
    # We need to register the optimiser to make the dummy appears really
347
269
    # different from a regular bzr repository.
348
270
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
381
303
        reg = foreign.ForeignVcsRegistry()
382
304
        vcs = DummyForeignVcs()
383
305
        reg.register("dummy", vcs, "Dummy VCS")
384
 
        self.assertEquals((
385
 
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
386
 
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
306
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
307
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
387
308
 
388
309
 
389
310
class ForeignRevisionTests(tests.TestCase):
457
378
        source_tree = self.make_branch_and_tree("source")
458
379
        target_tree = self.make_branch_and_tree("target", 
459
380
            format=DummyForeignVcsDirFormat())
460
 
        pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
381
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
461
382
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
462
383
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
463
384
        self.assertEquals({}, pushresult.revidmap)
471
392
            format=DummyForeignVcsDirFormat())
472
393
        target_tree.branch.lock_write()
473
394
        try:
474
 
            pushresult = source_tree.branch.push(
475
 
                target_tree.branch, lossy=True)
 
395
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
476
396
        finally:
477
397
            target_tree.branch.unlock()
478
398
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)