~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: John Arbash Meinel
  • Date: 2008-07-09 21:42:24 UTC
  • mto: This revision was merged to the branch mainline in revision 3543.
  • Revision ID: john@arbash-meinel.com-20080709214224-r75k87r6a01pfc3h
Restore a real weave merge to 'bzr merge --weave'.

To do so efficiently, we only add the simple LCAs to the final weave
object, unless we run into complexities with the merge graph.
This gives the same effective result as adding all the texts,
with the advantage of not having to extract all of them.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2012, 2016 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
"""Tests for foreign VCS utility code."""
19
 
 
20
 
 
21
 
from bzrlib import (
22
 
    branch,
23
 
    bzrdir,
24
 
    controldir,
25
 
    errors,
26
 
    foreign,
27
 
    lockable_files,
28
 
    lockdir,
29
 
    repository,
30
 
    revision,
31
 
    tests,
32
 
    trace,
33
 
    vf_repository,
34
 
    )
35
 
 
36
 
from bzrlib.repofmt import groupcompress_repo
37
 
 
38
 
# This is the dummy foreign revision control system, used 
39
 
# mainly here in the testsuite to test the foreign VCS infrastructure.
40
 
# It is basically standard Bazaar with some minor modifications to 
41
 
# make it "foreign". 
42
 
43
 
# It has the following differences to "regular" Bazaar:
44
 
# - The control directory is named ".dummy", not ".bzr".
45
 
# - The revision ids are tuples, not strings.
46
 
# - Doesn't support more than one parent natively
47
 
 
48
 
 
49
 
class DummyForeignVcsMapping(foreign.VcsMapping):
50
 
    """A simple mapping for the dummy Foreign VCS, for use with testing."""
51
 
 
52
 
    def __eq__(self, other):
53
 
        return type(self) == type(other)
54
 
 
55
 
    def revision_id_bzr_to_foreign(self, bzr_revid):
56
 
        return tuple(bzr_revid[len("dummy-v1:"):].split("-")), self
57
 
 
58
 
    def revision_id_foreign_to_bzr(self, foreign_revid):
59
 
        return "dummy-v1:%s-%s-%s" % foreign_revid
60
 
 
61
 
 
62
 
class DummyForeignVcsMappingRegistry(foreign.VcsMappingRegistry):
63
 
 
64
 
    def revision_id_bzr_to_foreign(self, revid):
65
 
        if not revid.startswith("dummy-"):
66
 
            raise errors.InvalidRevisionId(revid, None)
67
 
        mapping_version = revid[len("dummy-"):len("dummy-vx")]
68
 
        mapping = self.get(mapping_version)
69
 
        return mapping.revision_id_bzr_to_foreign(revid)
70
 
 
71
 
 
72
 
class DummyForeignVcs(foreign.ForeignVcs):
73
 
    """A dummy Foreign VCS, for use with testing.
74
 
 
75
 
    It has revision ids that are a tuple with three strings.
76
 
    """
77
 
 
78
 
    def __init__(self):
79
 
        self.mapping_registry = DummyForeignVcsMappingRegistry()
80
 
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
81
 
                                       "Version 1")
82
 
        self.abbreviation = "dummy"
83
 
 
84
 
    def show_foreign_revid(self, foreign_revid):
85
 
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
86
 
 
87
 
    def serialize_foreign_revid(self, foreign_revid):
88
 
        return "%s|%s|%s" % foreign_revid
89
 
 
90
 
 
91
 
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
92
 
    """A Dummy VCS Branch."""
93
 
 
94
 
    @property
95
 
    def user_transport(self):
96
 
        return self.bzrdir.user_transport
97
 
 
98
 
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
99
 
        self._format = _format
100
 
        self._base = a_bzrdir.transport.base
101
 
        self._ignore_fallbacks = False
102
 
        self.bzrdir = a_bzrdir
103
 
        foreign.ForeignBranch.__init__(self,
104
 
            DummyForeignVcsMapping(DummyForeignVcs()))
105
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
106
 
            *args, **kwargs)
107
 
 
108
 
    def _get_checkout_format(self, lightweight=False):
109
 
        """Return the most suitable metadir for a checkout of this branch.
110
 
        Weaves are used if this branch's repository uses weaves.
111
 
        """
112
 
        return self.bzrdir.checkout_metadir()
113
 
 
114
 
    def import_last_revision_info_and_tags(self, source, revno, revid,
115
 
                                           lossy=False):
116
 
        interbranch = InterToDummyVcsBranch(source, self)
117
 
        result = interbranch.push(stop_revision=revid, lossy=True)
118
 
        if lossy:
119
 
            revid = result.revidmap[revid]
120
 
        return (revno, revid)
121
 
 
122
 
 
123
 
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
124
 
 
125
 
    def _generate_revision_if_needed(self):
126
 
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
127
 
        if self._lossy:
128
 
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
129
 
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
130
 
            self.random_revid = False
131
 
        elif self._new_revision_id is not None:
132
 
            self.random_revid = False
133
 
        else:
134
 
            self._new_revision_id = self._gen_revision_id()
135
 
            self.random_revid = True
136
 
 
137
 
 
138
 
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
139
 
    foreign.ForeignRepository):
140
 
    """Dummy foreign vcs repository."""
141
 
 
142
 
 
143
 
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
144
 
 
145
 
    repository_class = DummyForeignVcsRepository
146
 
    _commit_builder_class = DummyForeignCommitBuilder
147
 
 
148
 
    @classmethod
149
 
    def get_format_string(cls):
150
 
        return "Dummy Foreign Vcs Repository"
151
 
 
152
 
    def get_format_description(self):
153
 
        return "Dummy Foreign Vcs Repository"
154
 
 
155
 
 
156
 
def branch_history(graph, revid):
157
 
    ret = list(graph.iter_lefthand_ancestry(revid,
158
 
        (revision.NULL_REVISION,)))
159
 
    ret.reverse()
160
 
    return ret
161
 
 
162
 
 
163
 
class InterToDummyVcsBranch(branch.GenericInterBranch):
164
 
 
165
 
    @staticmethod
166
 
    def is_compatible(source, target):
167
 
        return isinstance(target, DummyForeignVcsBranch)
168
 
 
169
 
    def push(self, overwrite=False, stop_revision=None, lossy=False):
170
 
        if not lossy:
171
 
            raise errors.NoRoundtrippingSupport(self.source, self.target)
172
 
        result = branch.BranchPushResult()
173
 
        result.source_branch = self.source
174
 
        result.target_branch = self.target
175
 
        result.old_revno, result.old_revid = self.target.last_revision_info()
176
 
        self.source.lock_read()
177
 
        try:
178
 
            graph = self.source.repository.get_graph()
179
 
            # This just handles simple cases, but that's good enough for tests
180
 
            my_history = branch_history(self.target.repository.get_graph(),
181
 
                result.old_revid)
182
 
            if stop_revision is None:
183
 
                stop_revision = self.source.last_revision()
184
 
            their_history = branch_history(graph, stop_revision)
185
 
            if their_history[:min(len(my_history), len(their_history))] != my_history:
186
 
                raise errors.DivergedBranches(self.target, self.source)
187
 
            todo = their_history[len(my_history):]
188
 
            revidmap = {}
189
 
            for revid in todo:
190
 
                rev = self.source.repository.get_revision(revid)
191
 
                tree = self.source.repository.revision_tree(revid)
192
 
                def get_file_with_stat(file_id, path=None):
193
 
                    return (tree.get_file(file_id), None)
194
 
                tree.get_file_with_stat = get_file_with_stat
195
 
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
196
 
                    (str(rev.timestamp), str(rev.timezone),
197
 
                        str(self.target.revno())))
198
 
                parent_revno, parent_revid= self.target.last_revision_info()
199
 
                if parent_revid == revision.NULL_REVISION:
200
 
                    parent_revids = []
201
 
                else:
202
 
                    parent_revids = [parent_revid]
203
 
                builder = self.target.get_commit_builder(parent_revids, 
204
 
                        self.target.get_config_stack(), rev.timestamp,
205
 
                        rev.timezone, rev.committer, rev.properties,
206
 
                        new_revid)
207
 
                try:
208
 
                    parent_tree = self.target.repository.revision_tree(
209
 
                        parent_revid)
210
 
                    for path, ie in tree.iter_entries_by_dir():
211
 
                        new_ie = ie.copy()
212
 
                        new_ie.revision = None
213
 
                        builder.record_entry_contents(new_ie, 
214
 
                            [parent_tree.root_inventory],
215
 
                            path, tree, 
216
 
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
217
 
                    builder.finish_inventory()
218
 
                except:
219
 
                    builder.abort()
220
 
                    raise
221
 
                revidmap[revid] = builder.commit(rev.message)
222
 
                self.target.set_last_revision_info(parent_revno+1, 
223
 
                    revidmap[revid])
224
 
                trace.mutter('lossily pushed revision %s -> %s', 
225
 
                    revid, revidmap[revid])
226
 
        finally:
227
 
            self.source.unlock()
228
 
        result.new_revno, result.new_revid = self.target.last_revision_info()
229
 
        result.revidmap = revidmap
230
 
        return result
231
 
 
232
 
 
233
 
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
234
 
 
235
 
    @classmethod
236
 
    def get_format_string(cls):
237
 
        return "Branch for Testing"
238
 
 
239
 
    @property
240
 
    def _matchingbzrdir(self):
241
 
        return DummyForeignVcsDirFormat()
242
 
 
243
 
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
244
 
            found_repository=None):
245
 
        if name is None:
246
 
            name = a_bzrdir._get_selected_branch()
247
 
        if not _found:
248
 
            raise NotImplementedError
249
 
        try:
250
 
            transport = a_bzrdir.get_branch_transport(None, name=name)
251
 
            control_files = lockable_files.LockableFiles(transport, 'lock',
252
 
                                                         lockdir.LockDir)
253
 
            if found_repository is None:
254
 
                found_repository = a_bzrdir.find_repository()
255
 
            return DummyForeignVcsBranch(_format=self,
256
 
                              _control_files=control_files,
257
 
                              a_bzrdir=a_bzrdir,
258
 
                              _repository=found_repository,
259
 
                              name=name)
260
 
        except errors.NoSuchFile:
261
 
            raise errors.NotBranchError(path=transport.base)
262
 
 
263
 
 
264
 
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
265
 
    """BzrDirFormat for the dummy foreign VCS."""
266
 
 
267
 
    @classmethod
268
 
    def get_format_string(cls):
269
 
        return "A Dummy VCS Dir"
270
 
 
271
 
    @classmethod
272
 
    def get_format_description(cls):
273
 
        return "A Dummy VCS Dir"
274
 
 
275
 
    @classmethod
276
 
    def is_supported(cls):
277
 
        return True
278
 
 
279
 
    def get_branch_format(self):
280
 
        return DummyForeignVcsBranchFormat()
281
 
 
282
 
    @property
283
 
    def repository_format(self):
284
 
        return DummyForeignVcsRepositoryFormat()
285
 
 
286
 
    def initialize_on_transport(self, transport):
287
 
        """Initialize a new bzrdir in the base directory of a Transport."""
288
 
        # Since we don't have a .bzr directory, inherit the
289
 
        # mode from the root directory
290
 
        temp_control = lockable_files.LockableFiles(transport,
291
 
                            '', lockable_files.TransportLock)
292
 
        temp_control._transport.mkdir('.dummy',
293
 
                                      # FIXME: RBC 20060121 don't peek under
294
 
                                      # the covers
295
 
                                      mode=temp_control._dir_mode)
296
 
        del temp_control
297
 
        bzrdir_transport = transport.clone('.dummy')
298
 
        # NB: no need to escape relative paths that are url safe.
299
 
        control_files = lockable_files.LockableFiles(bzrdir_transport,
300
 
            self._lock_file_name, self._lock_class)
301
 
        control_files.create_lock()
302
 
        return self.open(transport, _found=True)
303
 
 
304
 
    def _open(self, transport):
305
 
        return DummyForeignVcsDir(transport, self)
306
 
 
307
 
 
308
 
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
309
 
 
310
 
    def __init__(self, _transport, _format):
311
 
        self._format = _format
312
 
        self.transport = _transport.clone('.dummy')
313
 
        self.root_transport = _transport
314
 
        self._mode_check_done = False
315
 
        self._control_files = lockable_files.LockableFiles(self.transport,
316
 
            "lock", lockable_files.TransportLock)
317
 
 
318
 
    def create_workingtree(self):
319
 
        # dirstate requires a ".bzr" entry to exist
320
 
        self.root_transport.put_bytes(".bzr", "foo")
321
 
        return super(DummyForeignVcsDir, self).create_workingtree()
322
 
 
323
 
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True,
324
 
                    possible_transports=None):
325
 
        if name is None:
326
 
            name = self._get_selected_branch()
327
 
        if name != "":
328
 
            raise errors.NoColocatedBranchSupport(self)
329
 
        return self._format.get_branch_format().open(self, _found=True)
330
 
 
331
 
    def cloning_metadir(self, stacked=False):
332
 
        """Produce a metadir suitable for cloning with."""
333
 
        return controldir.format_registry.make_bzrdir("default")
334
 
 
335
 
    def checkout_metadir(self):
336
 
        return self.cloning_metadir()
337
 
 
338
 
    def sprout(self, url, revision_id=None, force_new_repo=False,
339
 
               recurse='down', possible_transports=None,
340
 
               accelerator_tree=None, hardlink=False, stacked=False,
341
 
               source_branch=None):
342
 
        # dirstate doesn't cope with accelerator_trees well
343
 
        # that have a different control dir
344
 
        return super(DummyForeignVcsDir, self).sprout(
345
 
            url=url,
346
 
            revision_id=revision_id, force_new_repo=force_new_repo,
347
 
            recurse=recurse, possible_transports=possible_transports,
348
 
            hardlink=hardlink, stacked=stacked, source_branch=source_branch)
349
 
 
350
 
 
351
 
def register_dummy_foreign_for_test(testcase):
352
 
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
353
 
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
354
 
                        DummyForeignProber)
355
 
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
356
 
    testcase.addCleanup(repository.format_registry.remove,
357
 
                        DummyForeignVcsRepositoryFormat())
358
 
    branch.format_registry.register(DummyForeignVcsBranchFormat())
359
 
    testcase.addCleanup(branch.format_registry.remove,
360
 
                        DummyForeignVcsBranchFormat())
361
 
    # We need to register the optimiser to make the dummy appears really
362
 
    # different from a regular bzr repository.
363
 
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
364
 
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
365
 
                        InterToDummyVcsBranch)
366
 
 
367
 
 
368
 
class DummyForeignProber(controldir.Prober):
369
 
 
370
 
    @classmethod
371
 
    def probe_transport(klass, transport):
372
 
        """Return the .bzrdir style format present in a directory."""
373
 
        if not transport.has('.dummy'):
374
 
            raise errors.NotBranchError(path=transport.base)
375
 
        return DummyForeignVcsDirFormat()
376
 
 
377
 
    @classmethod
378
 
    def known_formats(cls):
379
 
        return set([DummyForeignVcsDirFormat()])
380
 
 
381
 
 
382
 
class ForeignVcsRegistryTests(tests.TestCase):
383
 
    """Tests for the ForeignVcsRegistry class."""
384
 
 
385
 
    def test_parse_revision_id_no_dash(self):
386
 
        reg = foreign.ForeignVcsRegistry()
387
 
        self.assertRaises(errors.InvalidRevisionId,
388
 
                          reg.parse_revision_id, "invalid")
389
 
 
390
 
    def test_parse_revision_id_unknown_mapping(self):
391
 
        reg = foreign.ForeignVcsRegistry()
392
 
        self.assertRaises(errors.InvalidRevisionId,
393
 
                          reg.parse_revision_id, "unknown-foreignrevid")
394
 
 
395
 
    def test_parse_revision_id(self):
396
 
        reg = foreign.ForeignVcsRegistry()
397
 
        vcs = DummyForeignVcs()
398
 
        reg.register("dummy", vcs, "Dummy VCS")
399
 
        self.assertEqual((
400
 
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
401
 
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
402
 
 
403
 
 
404
 
class ForeignRevisionTests(tests.TestCase):
405
 
    """Tests for the ForeignRevision class."""
406
 
 
407
 
    def test_create(self):
408
 
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
409
 
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
410
 
                                      mapp, "roundtripped-revid")
411
 
        self.assertEqual("", rev.inventory_sha1)
412
 
        self.assertEqual(("a", "foreign", "revid"), rev.foreign_revid)
413
 
        self.assertEqual(mapp, rev.mapping)
414
 
 
415
 
 
416
 
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
417
 
    """Tests for update_workingtree_fileids()."""
418
 
 
419
 
    def test_update_workingtree(self):
420
 
        wt = self.make_branch_and_tree('br1')
421
 
        self.build_tree_contents([('br1/bla', 'original contents\n')])
422
 
        wt.add('bla', 'bla-a')
423
 
        wt.commit('bla-a')
424
 
        root_id = wt.get_root_id()
425
 
        target = wt.bzrdir.sprout('br2').open_workingtree()
426
 
        target.unversion(['bla-a'])
427
 
        target.add('bla', 'bla-b')
428
 
        target.commit('bla-b')
429
 
        target_basis = target.basis_tree()
430
 
        target_basis.lock_read()
431
 
        self.addCleanup(target_basis.unlock)
432
 
        foreign.update_workingtree_fileids(wt, target_basis)
433
 
        wt.lock_read()
434
 
        try:
435
 
            self.assertEqual(set([root_id, "bla-b"]), set(wt.all_file_ids()))
436
 
        finally:
437
 
            wt.unlock()
438
 
 
439
 
 
440
 
class DummyForeignVcsTests(tests.TestCaseWithTransport):
441
 
    """Very basic test for DummyForeignVcs."""
442
 
 
443
 
    def setUp(self):
444
 
        super(DummyForeignVcsTests, self).setUp()
445
 
        register_dummy_foreign_for_test(self)
446
 
 
447
 
    def test_create(self):
448
 
        """Test we can create dummies."""
449
 
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
450
 
        dir = controldir.ControlDir.open("d")
451
 
        self.assertEqual("A Dummy VCS Dir", dir._format.get_format_string())
452
 
        dir.open_repository()
453
 
        dir.open_branch()
454
 
        dir.open_workingtree()
455
 
 
456
 
    def test_sprout(self):
457
 
        """Test we can clone dummies and that the format is not preserved."""
458
 
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
459
 
        dir = controldir.ControlDir.open("d")
460
 
        newdir = dir.sprout("e")
461
 
        self.assertNotEqual("A Dummy VCS Dir",
462
 
                            newdir._format.get_format_string())
463
 
 
464
 
    def test_push_not_supported(self):
465
 
        source_tree = self.make_branch_and_tree("source")
466
 
        target_tree = self.make_branch_and_tree(
467
 
            "target", format=DummyForeignVcsDirFormat())
468
 
        self.assertRaises(errors.NoRoundtrippingSupport,
469
 
                          source_tree.branch.push, target_tree.branch)
470
 
 
471
 
    def test_lossy_push_empty(self):
472
 
        source_tree = self.make_branch_and_tree("source")
473
 
        target_tree = self.make_branch_and_tree(
474
 
            "target", format=DummyForeignVcsDirFormat())
475
 
        pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
476
 
        self.assertEqual(revision.NULL_REVISION, pushresult.old_revid)
477
 
        self.assertEqual(revision.NULL_REVISION, pushresult.new_revid)
478
 
        self.assertEqual({}, pushresult.revidmap)
479
 
 
480
 
    def test_lossy_push_simple(self):
481
 
        source_tree = self.make_branch_and_tree("source")
482
 
        self.build_tree(['source/a', 'source/b'])
483
 
        source_tree.add(['a', 'b'])
484
 
        revid1 = source_tree.commit("msg")
485
 
        target_tree = self.make_branch_and_tree(
486
 
            "target", format=DummyForeignVcsDirFormat())
487
 
        target_tree.branch.lock_write()
488
 
        try:
489
 
            pushresult = source_tree.branch.push(
490
 
                target_tree.branch, lossy=True)
491
 
        finally:
492
 
            target_tree.branch.unlock()
493
 
        self.assertEqual(revision.NULL_REVISION, pushresult.old_revid)
494
 
        self.assertEqual({revid1: target_tree.branch.last_revision()},
495
 
                         pushresult.revidmap)
496
 
        self.assertEqual(pushresult.revidmap[revid1], pushresult.new_revid)