~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Vincent Ladeuil
  • Date: 2010-02-10 15:46:03 UTC
  • mfrom: (4985.3.21 update)
  • mto: This revision was merged to the branch mainline in revision 5021.
  • Revision ID: v.ladeuil+lp@free.fr-20100210154603-k4no1gvfuqpzrw7p
Update performs two merges in a more logical order but stop on conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
"""Tests for foreign VCS utility code."""
19
19
 
20
 
from bzrlib import errors, foreign
21
 
from bzrlib.revision import Revision
22
 
from bzrlib.tests import TestCase
 
20
 
 
21
from bzrlib import (
 
22
    branch,
 
23
    bzrdir,
 
24
    errors,
 
25
    foreign,
 
26
    lockable_files,
 
27
    lockdir,
 
28
    revision,
 
29
    tests,
 
30
    trace,
 
31
    )
 
32
 
 
33
# This is the dummy foreign revision control system, used 
 
34
# mainly here in the testsuite to test the foreign VCS infrastructure.
 
35
# It is basically standard Bazaar with some minor modifications to 
 
36
# make it "foreign". 
 
37
 
38
# It has the following differences to "regular" Bazaar:
 
39
# - The control directory is named ".dummy", not ".bzr".
 
40
# - The revision ids are tuples, not strings.
 
41
# - Doesn't support more than one parent natively
23
42
 
24
43
 
25
44
class DummyForeignVcsMapping(foreign.VcsMapping):
47
66
 
48
67
class DummyForeignVcs(foreign.ForeignVcs):
49
68
    """A dummy Foreign VCS, for use with testing.
50
 
    
 
69
 
51
70
    It has revision ids that are a tuple with three strings.
52
71
    """
53
72
 
54
73
    def __init__(self):
55
74
        self.mapping_registry = DummyForeignVcsMappingRegistry()
56
 
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self), 
 
75
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
57
76
                                       "Version 1")
 
77
        self.abbreviation = "dummy"
58
78
 
59
79
    def show_foreign_revid(self, foreign_revid):
60
80
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
61
81
 
62
 
 
63
 
 
64
 
class ForeignVcsRegistryTests(TestCase):
65
 
 
66
 
    def test_parse_revision_id_no_dash(self):       
 
82
    def serialize_foreign_revid(self, foreign_revid):
 
83
        return "%s|%s|%s" % foreign_revid
 
84
 
 
85
 
 
86
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
 
87
    """A Dummy VCS Branch."""
 
88
 
 
89
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
 
90
        self._format = _format
 
91
        self._base = a_bzrdir.transport.base
 
92
        self._ignore_fallbacks = False
 
93
        foreign.ForeignBranch.__init__(self, 
 
94
            DummyForeignVcsMapping(DummyForeignVcs()))
 
95
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
 
96
            *args, **kwargs)
 
97
 
 
98
 
 
99
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
100
                            foreign.InterToForeignBranch):
 
101
 
 
102
    @staticmethod
 
103
    def is_compatible(source, target):
 
104
        return isinstance(target, DummyForeignVcsBranch)
 
105
 
 
106
    def push(self, overwrite=False, stop_revision=None):
 
107
        raise errors.NoRoundtrippingSupport(self.source, self.target)
 
108
 
 
109
    def lossy_push(self, stop_revision=None):
 
110
        result = branch.BranchPushResult()
 
111
        result.source_branch = self.source
 
112
        result.target_branch = self.target
 
113
        result.old_revno, result.old_revid = self.target.last_revision_info()
 
114
        self.source.lock_read()
 
115
        try:
 
116
            # This just handles simple cases, but that's good enough for tests
 
117
            my_history = self.target.revision_history()
 
118
            their_history = self.source.revision_history()
 
119
            if their_history[:min(len(my_history), len(their_history))] != my_history:
 
120
                raise errors.DivergedBranches(self.target, self.source)
 
121
            todo = their_history[len(my_history):]
 
122
            revidmap = {}
 
123
            for revid in todo:
 
124
                rev = self.source.repository.get_revision(revid)
 
125
                tree = self.source.repository.revision_tree(revid)
 
126
                def get_file_with_stat(file_id, path=None):
 
127
                    return (tree.get_file(file_id), None)
 
128
                tree.get_file_with_stat = get_file_with_stat
 
129
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
 
130
                    (str(rev.timestamp), str(rev.timezone), 
 
131
                        str(self.target.revno())))
 
132
                parent_revno, parent_revid= self.target.last_revision_info()
 
133
                if parent_revid == revision.NULL_REVISION:
 
134
                    parent_revids = []
 
135
                else:
 
136
                    parent_revids = [parent_revid]
 
137
                builder = self.target.get_commit_builder(parent_revids, 
 
138
                        self.target.get_config(), rev.timestamp,
 
139
                        rev.timezone, rev.committer, rev.properties,
 
140
                        new_revid)
 
141
                try:
 
142
                    for path, ie in tree.inventory.iter_entries():
 
143
                        new_ie = ie.copy()
 
144
                        new_ie.revision = None
 
145
                        builder.record_entry_contents(new_ie, 
 
146
                            [self.target.repository.revision_tree(parent_revid).inventory],
 
147
                            path, tree, 
 
148
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
 
149
                    builder.finish_inventory()
 
150
                except:
 
151
                    builder.abort()
 
152
                    raise
 
153
                revidmap[revid] = builder.commit(rev.message)
 
154
                self.target.set_last_revision_info(parent_revno+1, 
 
155
                    revidmap[revid])
 
156
                trace.mutter('lossily pushed revision %s -> %s', 
 
157
                    revid, revidmap[revid])
 
158
        finally:
 
159
            self.source.unlock()
 
160
        result.new_revno, result.new_revid = self.target.last_revision_info()
 
161
        result.revidmap = revidmap
 
162
        return result
 
163
 
 
164
 
 
165
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
 
166
 
 
167
    def get_format_string(self):
 
168
        return "Branch for Testing"
 
169
 
 
170
    def __init__(self):
 
171
        super(DummyForeignVcsBranchFormat, self).__init__()
 
172
        self._matchingbzrdir = DummyForeignVcsDirFormat()
 
173
 
 
174
    def open(self, a_bzrdir, _found=False):
 
175
        if not _found:
 
176
            raise NotImplementedError
 
177
        try:
 
178
            transport = a_bzrdir.get_branch_transport(None)
 
179
            control_files = lockable_files.LockableFiles(transport, 'lock',
 
180
                                                         lockdir.LockDir)
 
181
            return DummyForeignVcsBranch(_format=self,
 
182
                              _control_files=control_files,
 
183
                              a_bzrdir=a_bzrdir,
 
184
                              _repository=a_bzrdir.find_repository())
 
185
        except errors.NoSuchFile:
 
186
            raise errors.NotBranchError(path=transport.base)
 
187
 
 
188
 
 
189
class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
 
190
    """BzrDirFormat for the dummy foreign VCS."""
 
191
 
 
192
    @classmethod
 
193
    def get_format_string(cls):
 
194
        return "A Dummy VCS Dir"
 
195
 
 
196
    @classmethod
 
197
    def get_format_description(cls):
 
198
        return "A Dummy VCS Dir"
 
199
 
 
200
    @classmethod
 
201
    def is_supported(cls):
 
202
        return True
 
203
 
 
204
    def get_branch_format(self):
 
205
        return DummyForeignVcsBranchFormat()
 
206
 
 
207
    @classmethod
 
208
    def probe_transport(klass, transport):
 
209
        """Return the .bzrdir style format present in a directory."""
 
210
        if not transport.has('.dummy'):
 
211
            raise errors.NotBranchError(path=transport.base)
 
212
        return klass()
 
213
 
 
214
    def initialize_on_transport(self, transport):
 
215
        """Initialize a new bzrdir in the base directory of a Transport."""
 
216
        # Since we don't have a .bzr directory, inherit the
 
217
        # mode from the root directory
 
218
        temp_control = lockable_files.LockableFiles(transport,
 
219
                            '', lockable_files.TransportLock)
 
220
        temp_control._transport.mkdir('.dummy',
 
221
                                      # FIXME: RBC 20060121 don't peek under
 
222
                                      # the covers
 
223
                                      mode=temp_control._dir_mode)
 
224
        del temp_control
 
225
        bzrdir_transport = transport.clone('.dummy')
 
226
        # NB: no need to escape relative paths that are url safe.
 
227
        control_files = lockable_files.LockableFiles(bzrdir_transport,
 
228
            self._lock_file_name, self._lock_class)
 
229
        control_files.create_lock()
 
230
        return self.open(transport, _found=True)
 
231
 
 
232
    def _open(self, transport):
 
233
        return DummyForeignVcsDir(transport, self)
 
234
 
 
235
 
 
236
class DummyForeignVcsDir(bzrdir.BzrDirMeta1):
 
237
 
 
238
    def __init__(self, _transport, _format):
 
239
        self._format = _format
 
240
        self.transport = _transport.clone('.dummy')
 
241
        self.root_transport = _transport
 
242
        self._mode_check_done = False
 
243
        self._control_files = lockable_files.LockableFiles(self.transport,
 
244
            "lock", lockable_files.TransportLock)
 
245
 
 
246
    def open_branch(self, ignore_fallbacks=True):
 
247
        return self._format.get_branch_format().open(self, _found=True)
 
248
 
 
249
    def cloning_metadir(self, stacked=False):
 
250
        """Produce a metadir suitable for cloning with."""
 
251
        return bzrdir.format_registry.make_bzrdir("default")
 
252
 
 
253
    def sprout(self, url, revision_id=None, force_new_repo=False,
 
254
               recurse='down', possible_transports=None,
 
255
               accelerator_tree=None, hardlink=False, stacked=False,
 
256
               source_branch=None):
 
257
        # dirstate doesn't cope with accelerator_trees well 
 
258
        # that have a different control dir
 
259
        return super(DummyForeignVcsDir, self).sprout(url=url, 
 
260
                revision_id=revision_id, force_new_repo=force_new_repo, 
 
261
                recurse=recurse, possible_transports=possible_transports, 
 
262
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
 
263
 
 
264
 
 
265
def register_dummy_foreign_for_test(testcase):
 
266
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
267
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
 
268
                        DummyForeignVcsDirFormat)
 
269
    # We need to register the optimiser to make the dummy appears really
 
270
    # different from a regular bzr repository.
 
271
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
272
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
 
273
                        InterToDummyVcsBranch)
 
274
 
 
275
 
 
276
class ForeignVcsRegistryTests(tests.TestCase):
 
277
    """Tests for the ForeignVcsRegistry class."""
 
278
 
 
279
    def test_parse_revision_id_no_dash(self):
67
280
        reg = foreign.ForeignVcsRegistry()
68
 
        self.assertRaises(errors.InvalidRevisionId, 
 
281
        self.assertRaises(errors.InvalidRevisionId,
69
282
                          reg.parse_revision_id, "invalid")
70
 
        
71
 
    def test_parse_revision_id_unknown_mapping(self):       
 
283
 
 
284
    def test_parse_revision_id_unknown_mapping(self):
72
285
        reg = foreign.ForeignVcsRegistry()
73
 
        self.assertRaises(errors.InvalidRevisionId, 
 
286
        self.assertRaises(errors.InvalidRevisionId,
74
287
                          reg.parse_revision_id, "unknown-foreignrevid")
75
288
 
76
289
    def test_parse_revision_id(self):
81
294
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
82
295
 
83
296
 
84
 
class ForeignRevisionTests(TestCase):
 
297
class ForeignRevisionTests(tests.TestCase):
85
298
    """Tests for the ForeignRevision class."""
86
299
 
87
300
    def test_create(self):
88
301
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
89
 
        rev = foreign.ForeignRevision(("a", "foreign", "revid"), 
 
302
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
90
303
                                      mapp, "roundtripped-revid")
91
304
        self.assertEquals("", rev.inventory_sha1)
92
305
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
93
306
        self.assertEquals(mapp, rev.mapping)
94
307
 
95
308
 
96
 
class ShowForeignPropertiesTests(TestCase):
97
 
    """Tests for the show_foreign_properties() function."""
 
309
class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
 
310
    """Tests for update_workingtree_fileids()."""
 
311
 
 
312
    def test_update_workingtree(self):
 
313
        wt = self.make_branch_and_tree('br1')
 
314
        self.build_tree_contents([('br1/bla', 'original contents\n')])
 
315
        wt.add('bla', 'bla-a')
 
316
        wt.commit('bla-a')
 
317
        root_id = wt.get_root_id()
 
318
        target = wt.bzrdir.sprout('br2').open_workingtree()
 
319
        target.unversion(['bla-a'])
 
320
        target.add('bla', 'bla-b')
 
321
        target.commit('bla-b')
 
322
        target_basis = target.basis_tree()
 
323
        target_basis.lock_read()
 
324
        self.addCleanup(target_basis.unlock)
 
325
        foreign.update_workingtree_fileids(wt, target_basis)
 
326
        wt.lock_read()
 
327
        try:
 
328
            self.assertEquals(set([root_id, "bla-b"]), set(wt.inventory))
 
329
        finally:
 
330
            wt.unlock()
 
331
 
 
332
 
 
333
class DummyForeignVcsTests(tests.TestCaseWithTransport):
 
334
    """Very basic test for DummyForeignVcs."""
98
335
 
99
336
    def setUp(self):
100
 
        super(ShowForeignPropertiesTests, self).setUp()
101
 
        self.vcs = DummyForeignVcs()
102
 
        foreign.foreign_vcs_registry.register("dummy", 
103
 
            self.vcs, "Dummy VCS")
104
 
 
105
 
    def tearDown(self):
106
 
        super(ShowForeignPropertiesTests, self).tearDown()
107
 
        foreign.foreign_vcs_registry.remove("dummy")
108
 
 
109
 
    def test_show_non_foreign(self):
110
 
        """Test use with a native (non-foreign) bzr revision."""
111
 
        self.assertEquals({}, foreign.show_foreign_properties(Revision("arevid")))
112
 
 
113
 
    def test_show_imported(self):
114
 
        rev = Revision("dummy-v1:my-foreign-revid")
115
 
        self.assertEquals({ "dummy ding": "my/foreign\\revid" },
116
 
                          foreign.show_foreign_properties(rev))
117
 
 
118
 
    def test_show_direct(self):
119
 
        rev = foreign.ForeignRevision(("some", "foreign", "revid"), 
120
 
                                      DummyForeignVcsMapping(self.vcs), 
121
 
                                      "roundtrip-revid")
122
 
        self.assertEquals({ "dummy ding": "some/foreign\\revid" },
123
 
                          foreign.show_foreign_properties(rev))
 
337
        super(DummyForeignVcsTests, self).setUp()
 
338
        register_dummy_foreign_for_test(self)
 
339
 
 
340
    def test_create(self):
 
341
        """Test we can create dummies."""
 
342
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
343
        dir = bzrdir.BzrDir.open("d")
 
344
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
 
345
        dir.open_repository()
 
346
        dir.open_branch()
 
347
        dir.open_workingtree()
 
348
 
 
349
    def test_sprout(self):
 
350
        """Test we can clone dummies and that the format is not preserved."""
 
351
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
352
        dir = bzrdir.BzrDir.open("d")
 
353
        newdir = dir.sprout("e")
 
354
        self.assertNotEquals("A Dummy VCS Dir",
 
355
                             newdir._format.get_format_string())
 
356
 
 
357
    def test_push_not_supported(self):
 
358
        source_tree = self.make_branch_and_tree("source")
 
359
        target_tree = self.make_branch_and_tree("target", 
 
360
            format=DummyForeignVcsDirFormat())
 
361
        self.assertRaises(errors.NoRoundtrippingSupport, 
 
362
            source_tree.branch.push, target_tree.branch)
 
363
 
 
364
    def test_lossy_push_empty(self):
 
365
        source_tree = self.make_branch_and_tree("source")
 
366
        target_tree = self.make_branch_and_tree("target", 
 
367
            format=DummyForeignVcsDirFormat())
 
368
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
369
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
370
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
 
371
        self.assertEquals({}, pushresult.revidmap)
 
372
 
 
373
    def test_lossy_push_simple(self):
 
374
        source_tree = self.make_branch_and_tree("source")
 
375
        self.build_tree(['source/a', 'source/b'])
 
376
        source_tree.add(['a', 'b'])
 
377
        revid1 = source_tree.commit("msg")
 
378
        target_tree = self.make_branch_and_tree("target", 
 
379
            format=DummyForeignVcsDirFormat())
 
380
        target_tree.branch.lock_write()
 
381
        try:
 
382
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
383
        finally:
 
384
            target_tree.branch.unlock()
 
385
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
 
386
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
 
387
                           pushresult.revidmap)
 
388
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)