~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Andrew Bennetts
  • Date: 2010-03-11 04:33:41 UTC
  • mfrom: (4797.33.4 2.1)
  • mto: This revision was merged to the branch mainline in revision 5082.
  • Revision ID: andrew.bennetts@canonical.com-20100311043341-rzdik83fnactjsxs
Merge lp:bzr/2.1, including fixes for #496813, #526211, #526353.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
"""Tests for foreign VCS utility code."""
19
19
 
20
 
from bzrlib import errors, foreign
21
 
from bzrlib.revision import Revision
22
 
from bzrlib.tests import TestCase
 
20
 
 
21
from bzrlib import (
 
22
    branch,
 
23
    bzrdir,
 
24
    errors,
 
25
    foreign,
 
26
    lockable_files,
 
27
    lockdir,
 
28
    revision,
 
29
    tests,
 
30
    trace,
 
31
    )
 
32
 
 
33
# This is the dummy foreign revision control system, used 
 
34
# mainly here in the testsuite to test the foreign VCS infrastructure.
 
35
# It is basically standard Bazaar with some minor modifications to 
 
36
# make it "foreign". 
 
37
 
38
# It has the following differences to "regular" Bazaar:
 
39
# - The control directory is named ".dummy", not ".bzr".
 
40
# - The revision ids are tuples, not strings.
 
41
# - Doesn't support more than one parent natively
23
42
 
24
43
 
25
44
class DummyForeignVcsMapping(foreign.VcsMapping):
47
66
 
48
67
class DummyForeignVcs(foreign.ForeignVcs):
49
68
    """A dummy Foreign VCS, for use with testing.
50
 
    
 
69
 
51
70
    It has revision ids that are a tuple with three strings.
52
71
    """
53
72
 
54
73
    def __init__(self):
55
74
        self.mapping_registry = DummyForeignVcsMappingRegistry()
56
 
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self), 
 
75
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
57
76
                                       "Version 1")
 
77
        self.abbreviation = "dummy"
58
78
 
59
79
    def show_foreign_revid(self, foreign_revid):
60
80
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
61
81
 
62
 
 
63
 
 
64
 
class ForeignVcsRegistryTests(TestCase):
65
 
 
66
 
    def test_parse_revision_id_no_dash(self):       
 
82
    def serialize_foreign_revid(self, foreign_revid):
 
83
        return "%s|%s|%s" % foreign_revid
 
84
 
 
85
 
 
86
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
 
87
    """A Dummy VCS Branch."""
 
88
 
 
89
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
 
90
        self._format = _format
 
91
        self._base = a_bzrdir.transport.base
 
92
        self._ignore_fallbacks = False
 
93
        foreign.ForeignBranch.__init__(self, 
 
94
            DummyForeignVcsMapping(DummyForeignVcs()))
 
95
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
 
96
            *args, **kwargs)
 
97
 
 
98
 
 
99
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
100
                            foreign.InterToForeignBranch):
 
101
 
 
102
    @staticmethod
 
103
    def is_compatible(source, target):
 
104
        return isinstance(target, DummyForeignVcsBranch)
 
105
 
 
106
    def push(self, overwrite=False, stop_revision=None):
 
107
        raise errors.NoRoundtrippingSupport(self.source, self.target)
 
108
 
 
109
    def lossy_push(self, stop_revision=None):
 
110
        result = branch.BranchPushResult()
 
111
        result.source_branch = self.source
 
112
        result.target_branch = self.target
 
113
        result.old_revno, result.old_revid = self.target.last_revision_info()
 
114
        self.source.lock_read()
 
115
        try:
 
116
            # This just handles simple cases, but that's good enough for tests
 
117
            my_history = self.target.revision_history()
 
118
            their_history = self.source.revision_history()
 
119
            if their_history[:min(len(my_history), len(their_history))] != my_history:
 
120
                raise errors.DivergedBranches(self.target, self.source)
 
121
            todo = their_history[len(my_history):]
 
122
            revidmap = {}
 
123
            for revid in todo:
 
124
                rev = self.source.repository.get_revision(revid)
 
125
                tree = self.source.repository.revision_tree(revid)
 
126
                def get_file_with_stat(file_id, path=None):
 
127
                    return (tree.get_file(file_id), None)
 
128
                tree.get_file_with_stat = get_file_with_stat
 
129
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
 
130
                    (str(rev.timestamp), str(rev.timezone), 
 
131
                        str(self.target.revno())))
 
132
                parent_revno, parent_revid= self.target.last_revision_info()
 
133
                if parent_revid == revision.NULL_REVISION:
 
134
                    parent_revids = []
 
135
                else:
 
136
                    parent_revids = [parent_revid]
 
137
                builder = self.target.get_commit_builder(parent_revids, 
 
138
                        self.target.get_config(), rev.timestamp,
 
139
                        rev.timezone, rev.committer, rev.properties,
 
140
                        new_revid)
 
141
                try:
 
142
                    for path, ie in tree.inventory.iter_entries():
 
143
                        new_ie = ie.copy()
 
144
                        new_ie.revision = None
 
145
                        builder.record_entry_contents(new_ie, 
 
146
                            [self.target.repository.revision_tree(parent_revid).inventory],
 
147
                            path, tree, 
 
148
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
 
149
                    builder.finish_inventory()
 
150
                except:
 
151
                    builder.abort()
 
152
                    raise
 
153
                revidmap[revid] = builder.commit(rev.message)
 
154
                self.target.set_last_revision_info(parent_revno+1, 
 
155
                    revidmap[revid])
 
156
                trace.mutter('lossily pushed revision %s -> %s', 
 
157
                    revid, revidmap[revid])
 
158
        finally:
 
159
            self.source.unlock()
 
160
        result.new_revno, result.new_revid = self.target.last_revision_info()
 
161
        result.revidmap = revidmap
 
162
        return result
 
163
 
 
164
 
 
165
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
 
166
 
 
167
    def get_format_string(self):
 
168
        return "Branch for Testing"
 
169
 
 
170
    def __init__(self):
 
171
        super(DummyForeignVcsBranchFormat, self).__init__()
 
172
        self._matchingbzrdir = DummyForeignVcsDirFormat()
 
173
 
 
174
    def open(self, a_bzrdir, _found=False):
 
175
        if not _found:
 
176
            raise NotImplementedError
 
177
        try:
 
178
            transport = a_bzrdir.get_branch_transport(None)
 
179
            control_files = lockable_files.LockableFiles(transport, 'lock',
 
180
                                                         lockdir.LockDir)
 
181
            return DummyForeignVcsBranch(_format=self,
 
182
                              _control_files=control_files,
 
183
                              a_bzrdir=a_bzrdir,
 
184
                              _repository=a_bzrdir.find_repository())
 
185
        except errors.NoSuchFile:
 
186
            raise errors.NotBranchError(path=transport.base)
 
187
 
 
188
 
 
189
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
 
190
    """BzrDirFormat for the dummy foreign VCS."""
 
191
 
 
192
    @classmethod
 
193
    def get_format_string(cls):
 
194
        return "A Dummy VCS Dir"
 
195
 
 
196
    @classmethod
 
197
    def get_format_description(cls):
 
198
        return "A Dummy VCS Dir"
 
199
 
 
200
    @classmethod
 
201
    def is_supported(cls):
 
202
        return True
 
203
 
 
204
    def get_branch_format(self):
 
205
        return DummyForeignVcsBranchFormat()
 
206
 
 
207
    @classmethod
 
208
    def probe_transport(klass, transport):
 
209
        """Return the .bzrdir style format present in a directory."""
 
210
        if not transport.has('.dummy'):
 
211
            raise errors.NotBranchError(path=transport.base)
 
212
        return klass()
 
213
 
 
214
    def initialize_on_transport(self, transport):
 
215
        """Initialize a new bzrdir in the base directory of a Transport."""
 
216
        # Since we don't have a .bzr directory, inherit the
 
217
        # mode from the root directory
 
218
        temp_control = lockable_files.LockableFiles(transport,
 
219
                            '', lockable_files.TransportLock)
 
220
        temp_control._transport.mkdir('.dummy',
 
221
                                      # FIXME: RBC 20060121 don't peek under
 
222
                                      # the covers
 
223
                                      mode=temp_control._dir_mode)
 
224
        del temp_control
 
225
        bzrdir_transport = transport.clone('.dummy')
 
226
        # NB: no need to escape relative paths that are url safe.
 
227
        control_files = lockable_files.LockableFiles(bzrdir_transport,
 
228
            self._lock_file_name, self._lock_class)
 
229
        control_files.create_lock()
 
230
        return self.open(transport, _found=True)
 
231
 
 
232
    def _open(self, transport):
 
233
        return DummyForeignVcsDir(transport, self)
 
234
 
 
235
 
 
236
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
 
237
 
 
238
    def __init__(self, _transport, _format):
 
239
        self._format = _format
 
240
        self.transport = _transport.clone('.dummy')
 
241
        self.root_transport = _transport
 
242
        self._mode_check_done = False
 
243
        self._control_files = lockable_files.LockableFiles(self.transport,
 
244
            "lock", lockable_files.TransportLock)
 
245
 
 
246
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
 
247
        if name is not None:
 
248
            raise errors.NoColocatedBranchSupport(self)
 
249
        return self._format.get_branch_format().open(self, _found=True)
 
250
 
 
251
    def cloning_metadir(self, stacked=False):
 
252
        """Produce a metadir suitable for cloning with."""
 
253
        return bzrdir.format_registry.make_bzrdir("default")
 
254
 
 
255
    def sprout(self, url, revision_id=None, force_new_repo=False,
 
256
               recurse='down', possible_transports=None,
 
257
               accelerator_tree=None, hardlink=False, stacked=False,
 
258
               source_branch=None):
 
259
        # dirstate doesn't cope with accelerator_trees well 
 
260
        # that have a different control dir
 
261
        return super(DummyForeignVcsDir, self).sprout(url=url, 
 
262
                revision_id=revision_id, force_new_repo=force_new_repo, 
 
263
                recurse=recurse, possible_transports=possible_transports, 
 
264
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
 
265
 
 
266
 
 
267
def register_dummy_foreign_for_test(testcase):
 
268
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
269
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
 
270
                        DummyForeignVcsDirFormat)
 
271
    # We need to register the optimiser to make the dummy appears really
 
272
    # different from a regular bzr repository.
 
273
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
274
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
 
275
                        InterToDummyVcsBranch)
 
276
 
 
277
 
 
278
class ForeignVcsRegistryTests(tests.TestCase):
 
279
    """Tests for the ForeignVcsRegistry class."""
 
280
 
 
281
    def test_parse_revision_id_no_dash(self):
67
282
        reg = foreign.ForeignVcsRegistry()
68
 
        self.assertRaises(errors.InvalidRevisionId, 
 
283
        self.assertRaises(errors.InvalidRevisionId,
69
284
                          reg.parse_revision_id, "invalid")
70
 
        
71
 
    def test_parse_revision_id_unknown_mapping(self):       
 
285
 
 
286
    def test_parse_revision_id_unknown_mapping(self):
72
287
        reg = foreign.ForeignVcsRegistry()
73
 
        self.assertRaises(errors.InvalidRevisionId, 
 
288
        self.assertRaises(errors.InvalidRevisionId,
74
289
                          reg.parse_revision_id, "unknown-foreignrevid")
75
290
 
76
291
    def test_parse_revision_id(self):
81
296
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
82
297
 
83
298
 
84
 
class ForeignRevisionTests(TestCase):
 
299
class ForeignRevisionTests(tests.TestCase):
85
300
    """Tests for the ForeignRevision class."""
86
301
 
87
302
    def test_create(self):
88
303
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
89
 
        rev = foreign.ForeignRevision(("a", "foreign", "revid"), 
 
304
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
90
305
                                      mapp, "roundtripped-revid")
91
306
        self.assertEquals("", rev.inventory_sha1)
92
307
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
93
308
        self.assertEquals(mapp, rev.mapping)
94
309
 
95
310
 
96
 
class ShowForeignPropertiesTests(TestCase):
97
 
    """Tests for the show_foreign_properties() function."""
 
311
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
 
312
    """Tests for update_workingtree_fileids()."""
 
313
 
 
314
    def test_update_workingtree(self):
 
315
        wt = self.make_branch_and_tree('br1')
 
316
        self.build_tree_contents([('br1/bla', 'original contents\n')])
 
317
        wt.add('bla', 'bla-a')
 
318
        wt.commit('bla-a')
 
319
        root_id = wt.get_root_id()
 
320
        target = wt.bzrdir.sprout('br2').open_workingtree()
 
321
        target.unversion(['bla-a'])
 
322
        target.add('bla', 'bla-b')
 
323
        target.commit('bla-b')
 
324
        target_basis = target.basis_tree()
 
325
        target_basis.lock_read()
 
326
        self.addCleanup(target_basis.unlock)
 
327
        foreign.update_workingtree_fileids(wt, target_basis)
 
328
        wt.lock_read()
 
329
        try:
 
330
            self.assertEquals(set([root_id, "bla-b"]), set(wt.inventory))
 
331
        finally:
 
332
            wt.unlock()
 
333
 
 
334
 
 
335
class DummyForeignVcsTests(tests.TestCaseWithTransport):
 
336
    """Very basic test for DummyForeignVcs."""
98
337
 
99
338
    def setUp(self):
100
 
        super(ShowForeignPropertiesTests, self).setUp()
101
 
        self.vcs = DummyForeignVcs()
102
 
        foreign.foreign_vcs_registry.register("dummy", 
103
 
            self.vcs, "Dummy VCS")
104
 
 
105
 
    def tearDown(self):
106
 
        super(ShowForeignPropertiesTests, self).tearDown()
107
 
        foreign.foreign_vcs_registry.remove("dummy")
108
 
 
109
 
    def test_show_non_foreign(self):
110
 
        """Test use with a native (non-foreign) bzr revision."""
111
 
        self.assertEquals({}, foreign.show_foreign_properties(Revision("arevid")))
112
 
 
113
 
    def test_show_imported(self):
114
 
        rev = Revision("dummy-v1:my-foreign-revid")
115
 
        self.assertEquals({ "dummy ding": "my/foreign\\revid" },
116
 
                          foreign.show_foreign_properties(rev))
117
 
 
118
 
    def test_show_direct(self):
119
 
        rev = foreign.ForeignRevision(("some", "foreign", "revid"), 
120
 
                                      DummyForeignVcsMapping(self.vcs), 
121
 
                                      "roundtrip-revid")
122
 
        self.assertEquals({ "dummy ding": "some/foreign\\revid" },
123
 
                          foreign.show_foreign_properties(rev))
 
339
        super(DummyForeignVcsTests, self).setUp()
 
340
        register_dummy_foreign_for_test(self)
 
341
 
 
342
    def test_create(self):
 
343
        """Test we can create dummies."""
 
344
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
345
        dir = bzrdir.BzrDir.open("d")
 
346
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
 
347
        dir.open_repository()
 
348
        dir.open_branch()
 
349
        dir.open_workingtree()
 
350
 
 
351
    def test_sprout(self):
 
352
        """Test we can clone dummies and that the format is not preserved."""
 
353
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
354
        dir = bzrdir.BzrDir.open("d")
 
355
        newdir = dir.sprout("e")
 
356
        self.assertNotEquals("A Dummy VCS Dir",
 
357
                             newdir._format.get_format_string())
 
358
 
 
359
    def test_push_not_supported(self):
 
360
        source_tree = self.make_branch_and_tree("source")
 
361
        target_tree = self.make_branch_and_tree("target", 
 
362
            format=DummyForeignVcsDirFormat())
 
363
        self.assertRaises(errors.NoRoundtrippingSupport, 
 
364
            source_tree.branch.push, target_tree.branch)
 
365
 
 
366
    def test_lossy_push_empty(self):
 
367
        source_tree = self.make_branch_and_tree("source")
 
368
        target_tree = self.make_branch_and_tree("target", 
 
369
            format=DummyForeignVcsDirFormat())
 
370
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
371
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
372
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
 
373
        self.assertEquals({}, pushresult.revidmap)
 
374
 
 
375
    def test_lossy_push_simple(self):
 
376
        source_tree = self.make_branch_and_tree("source")
 
377
        self.build_tree(['source/a', 'source/b'])
 
378
        source_tree.add(['a', 'b'])
 
379
        revid1 = source_tree.commit("msg")
 
380
        target_tree = self.make_branch_and_tree("target", 
 
381
            format=DummyForeignVcsDirFormat())
 
382
        target_tree.branch.lock_write()
 
383
        try:
 
384
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
385
        finally:
 
386
            target_tree.branch.unlock()
 
387
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
388
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
 
389
                           pushresult.revidmap)
 
390
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)