~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-17 13:49:11 UTC
  • mfrom: (4988.11.1 imports)
  • Revision ID: pqm@pqm.ubuntu.com-20100217134911-s77se00ni7xc1hz8
(Jelmer) Remove some unused imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
from bzrlib import (
22
22
    branch,
23
23
    bzrdir,
24
 
    controldir,
25
24
    errors,
26
25
    foreign,
27
26
    lockable_files,
28
27
    lockdir,
29
 
    repository,
30
28
    revision,
31
29
    tests,
32
30
    trace,
33
 
    vf_repository,
34
31
    )
35
32
 
36
 
from bzrlib.repofmt import groupcompress_repo
37
 
 
38
33
# This is the dummy foreign revision control system, used 
39
34
# mainly here in the testsuite to test the foreign VCS infrastructure.
40
35
# It is basically standard Bazaar with some minor modifications to 
95
90
        self._format = _format
96
91
        self._base = a_bzrdir.transport.base
97
92
        self._ignore_fallbacks = False
98
 
        self.bzrdir = a_bzrdir
99
 
        foreign.ForeignBranch.__init__(self,
 
93
        foreign.ForeignBranch.__init__(self, 
100
94
            DummyForeignVcsMapping(DummyForeignVcs()))
101
 
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
 
95
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
102
96
            *args, **kwargs)
103
97
 
104
 
    def _get_checkout_format(self):
105
 
        """Return the most suitable metadir for a checkout of this branch.
106
 
        Weaves are used if this branch's repository uses weaves.
107
 
        """
108
 
        return self.bzrdir.checkout_metadir()
109
 
 
110
 
    def import_last_revision_info_and_tags(self, source, revno, revid,
111
 
                                           lossy=False):
112
 
        interbranch = InterToDummyVcsBranch(source, self)
113
 
        result = interbranch.push(stop_revision=revid, lossy=True)
114
 
        if lossy:
115
 
            revid = result.revidmap[revid]
116
 
        return (revno, revid)
117
 
 
118
 
 
119
 
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
120
 
 
121
 
    def _generate_revision_if_needed(self):
122
 
        mapping = DummyForeignVcsMapping(DummyForeignVcs())
123
 
        if self._lossy:
124
 
            self._new_revision_id = mapping.revision_id_foreign_to_bzr(
125
 
                (str(self._timestamp), str(self._timezone), "UNKNOWN"))
126
 
            self.random_revid = False
127
 
        elif self._new_revision_id is not None:
128
 
            self.random_revid = False
129
 
        else:
130
 
            self._new_revision_id = self._gen_revision_id()
131
 
            self.random_revid = True
132
 
 
133
 
 
134
 
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
135
 
    foreign.ForeignRepository):
136
 
    """Dummy foreign vcs repository."""
137
 
 
138
 
 
139
 
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
140
 
 
141
 
    repository_class = DummyForeignVcsRepository
142
 
    _commit_builder_class = DummyForeignCommitBuilder
143
 
 
144
 
    def get_format_string(self):
145
 
        return "Dummy Foreign Vcs Repository"
146
 
 
147
 
    def get_format_description(self):
148
 
        return "Dummy Foreign Vcs Repository"
149
 
 
150
 
 
151
 
class InterToDummyVcsBranch(branch.GenericInterBranch):
 
98
 
 
99
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
100
                            foreign.InterToForeignBranch):
152
101
 
153
102
    @staticmethod
154
103
    def is_compatible(source, target):
155
104
        return isinstance(target, DummyForeignVcsBranch)
156
105
 
157
 
    def push(self, overwrite=False, stop_revision=None, lossy=False):
158
 
        if not lossy:
159
 
            raise errors.NoRoundtrippingSupport(self.source, self.target)
 
106
    def push(self, overwrite=False, stop_revision=None):
 
107
        raise errors.NoRoundtrippingSupport(self.source, self.target)
 
108
 
 
109
    def lossy_push(self, stop_revision=None):
160
110
        result = branch.BranchPushResult()
161
111
        result.source_branch = self.source
162
112
        result.target_branch = self.target
163
113
        result.old_revno, result.old_revid = self.target.last_revision_info()
164
114
        self.source.lock_read()
165
115
        try:
166
 
            graph = self.source.repository.get_graph()
167
116
            # This just handles simple cases, but that's good enough for tests
168
117
            my_history = self.target.revision_history()
169
 
            if stop_revision is None:
170
 
                stop_revision = self.source.last_revision()
171
 
            their_history = list(graph.iter_lefthand_ancestry(stop_revision,
172
 
                (revision.NULL_REVISION,)))
173
 
            their_history.reverse()
 
118
            their_history = self.source.revision_history()
174
119
            if their_history[:min(len(my_history), len(their_history))] != my_history:
175
120
                raise errors.DivergedBranches(self.target, self.source)
176
121
            todo = their_history[len(my_history):]
182
127
                    return (tree.get_file(file_id), None)
183
128
                tree.get_file_with_stat = get_file_with_stat
184
129
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
185
 
                    (str(rev.timestamp), str(rev.timezone),
 
130
                    (str(rev.timestamp), str(rev.timezone), 
186
131
                        str(self.target.revno())))
187
132
                parent_revno, parent_revid= self.target.last_revision_info()
188
133
                if parent_revid == revision.NULL_REVISION:
226
171
        super(DummyForeignVcsBranchFormat, self).__init__()
227
172
        self._matchingbzrdir = DummyForeignVcsDirFormat()
228
173
 
229
 
    def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
230
 
            found_repository=None):
 
174
    def open(self, a_bzrdir, _found=False):
231
175
        if not _found:
232
176
            raise NotImplementedError
233
177
        try:
234
 
            transport = a_bzrdir.get_branch_transport(None, name=name)
 
178
            transport = a_bzrdir.get_branch_transport(None)
235
179
            control_files = lockable_files.LockableFiles(transport, 'lock',
236
180
                                                         lockdir.LockDir)
237
 
            if found_repository is None:
238
 
                found_repository = a_bzrdir.find_repository()
239
181
            return DummyForeignVcsBranch(_format=self,
240
182
                              _control_files=control_files,
241
183
                              a_bzrdir=a_bzrdir,
242
 
                              _repository=found_repository)
 
184
                              _repository=a_bzrdir.find_repository())
243
185
        except errors.NoSuchFile:
244
186
            raise errors.NotBranchError(path=transport.base)
245
187
 
262
204
    def get_branch_format(self):
263
205
        return DummyForeignVcsBranchFormat()
264
206
 
265
 
    @property
266
 
    def repository_format(self):
267
 
        return DummyForeignVcsRepositoryFormat()
 
207
    @classmethod
 
208
    def probe_transport(klass, transport):
 
209
        """Return the .bzrdir style format present in a directory."""
 
210
        if not transport.has('.dummy'):
 
211
            raise errors.NotBranchError(path=transport.base)
 
212
        return klass()
268
213
 
269
214
    def initialize_on_transport(self, transport):
270
215
        """Initialize a new bzrdir in the base directory of a Transport."""
298
243
        self._control_files = lockable_files.LockableFiles(self.transport,
299
244
            "lock", lockable_files.TransportLock)
300
245
 
301
 
    def create_workingtree(self):
302
 
        # dirstate requires a ".bzr" entry to exist
303
 
        self.root_transport.put_bytes(".bzr", "foo")
304
 
        return super(DummyForeignVcsDir, self).create_workingtree()
305
 
 
306
 
    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
307
 
        if name is not None:
308
 
            raise errors.NoColocatedBranchSupport(self)
 
246
    def open_branch(self, ignore_fallbacks=True):
309
247
        return self._format.get_branch_format().open(self, _found=True)
310
248
 
311
249
    def cloning_metadir(self, stacked=False):
312
250
        """Produce a metadir suitable for cloning with."""
313
251
        return bzrdir.format_registry.make_bzrdir("default")
314
252
 
315
 
    def checkout_metadir(self):
316
 
        return self.cloning_metadir()
317
 
 
318
253
    def sprout(self, url, revision_id=None, force_new_repo=False,
319
254
               recurse='down', possible_transports=None,
320
255
               accelerator_tree=None, hardlink=False, stacked=False,
328
263
 
329
264
 
330
265
def register_dummy_foreign_for_test(testcase):
331
 
    controldir.ControlDirFormat.register_prober(DummyForeignProber)
332
 
    testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
333
 
        DummyForeignProber)
334
 
    repository.format_registry.register(DummyForeignVcsRepositoryFormat())
335
 
    testcase.addCleanup(repository.format_registry.remove,
336
 
            DummyForeignVcsRepositoryFormat())
337
 
    branch.format_registry.register(DummyForeignVcsBranchFormat())
338
 
    testcase.addCleanup(branch.format_registry.remove,
339
 
            DummyForeignVcsBranchFormat())
 
266
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
267
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
 
268
                        DummyForeignVcsDirFormat)
340
269
    # We need to register the optimiser to make the dummy appears really
341
270
    # different from a regular bzr repository.
342
271
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
344
273
                        InterToDummyVcsBranch)
345
274
 
346
275
 
347
 
class DummyForeignProber(controldir.Prober):
348
 
 
349
 
    @classmethod
350
 
    def probe_transport(klass, transport):
351
 
        """Return the .bzrdir style format present in a directory."""
352
 
        if not transport.has('.dummy'):
353
 
            raise errors.NotBranchError(path=transport.base)
354
 
        return DummyForeignVcsDirFormat()
355
 
 
356
 
    @classmethod
357
 
    def known_formats(cls):
358
 
        return set([DummyForeignVcsDirFormat()])
359
 
 
360
 
 
361
276
class ForeignVcsRegistryTests(tests.TestCase):
362
277
    """Tests for the ForeignVcsRegistry class."""
363
278
 
375
290
        reg = foreign.ForeignVcsRegistry()
376
291
        vcs = DummyForeignVcs()
377
292
        reg.register("dummy", vcs, "Dummy VCS")
378
 
        self.assertEquals((
379
 
            ("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
380
 
            reg.parse_revision_id("dummy-v1:some-foreign-revid"))
 
293
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
 
294
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))
381
295
 
382
296
 
383
297
class ForeignRevisionTests(tests.TestCase):
451
365
        source_tree = self.make_branch_and_tree("source")
452
366
        target_tree = self.make_branch_and_tree("target", 
453
367
            format=DummyForeignVcsDirFormat())
454
 
        pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
368
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
455
369
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
456
370
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
457
371
        self.assertEquals({}, pushresult.revidmap)
465
379
            format=DummyForeignVcsDirFormat())
466
380
        target_tree.branch.lock_write()
467
381
        try:
468
 
            pushresult = source_tree.branch.push(target_tree.branch, lossy=True)
 
382
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
469
383
        finally:
470
384
            target_tree.branch.unlock()
471
385
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)