~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_foreign.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-27 05:35:00 UTC
  • mfrom: (4570 +trunk)
  • mto: (4634.6.29 2.0)
  • mto: This revision was merged to the branch mainline in revision 4680.
  • Revision ID: andrew.bennetts@canonical.com-20090727053500-q76zsn2dx33jhmj5
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
"""Tests for foreign VCS utility code."""
19
19
 
20
 
from bzrlib import errors, foreign
21
 
from bzrlib.revision import Revision
22
 
from bzrlib.tests import TestCase
 
20
 
 
21
from bzrlib import (
 
22
    branch,
 
23
    errors,
 
24
    foreign,
 
25
    lockable_files,
 
26
    lockdir,
 
27
    trace,
 
28
    )
 
29
from bzrlib.bzrdir import (
 
30
    BzrDir,
 
31
    BzrDirFormat,
 
32
    BzrDirMeta1,
 
33
    BzrDirMetaFormat1,
 
34
    format_registry,
 
35
    )
 
36
from bzrlib.inventory import Inventory
 
37
from bzrlib.revision import (
 
38
    NULL_REVISION,
 
39
    Revision,
 
40
    )
 
41
from bzrlib.tests import (
 
42
    TestCase,
 
43
    TestCaseWithTransport,
 
44
    )
 
45
 
 
46
# This is the dummy foreign revision control system, used 
 
47
# mainly here in the testsuite to test the foreign VCS infrastructure.
 
48
# It is basically standard Bazaar with some minor modifications to 
 
49
# make it "foreign". 
 
50
 
51
# It has the following differences to "regular" Bazaar:
 
52
# - The control directory is named ".dummy", not ".bzr".
 
53
# - The revision ids are tuples, not strings.
 
54
# - Doesn't support more than one parent natively
23
55
 
24
56
 
25
57
class DummyForeignVcsMapping(foreign.VcsMapping):
60
92
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }
61
93
 
62
94
 
 
95
class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
 
96
    """A Dummy VCS Branch."""
 
97
 
 
98
    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
 
99
        self._format = _format
 
100
        self._base = a_bzrdir.transport.base
 
101
        self._ignore_fallbacks = False
 
102
        foreign.ForeignBranch.__init__(self, 
 
103
            DummyForeignVcsMapping(DummyForeignVcs()))
 
104
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
 
105
            *args, **kwargs)
 
106
 
 
107
 
 
108
class InterToDummyVcsBranch(branch.GenericInterBranch,
 
109
                            foreign.InterToForeignBranch):
 
110
 
 
111
    @staticmethod
 
112
    def is_compatible(source, target):
 
113
        return isinstance(target, DummyForeignVcsBranch)
 
114
 
 
115
    def lossy_push(self, stop_revision=None):
 
116
        result = branch.BranchPushResult()
 
117
        result.source_branch = self.source
 
118
        result.target_branch = self.target
 
119
        result.old_revno, result.old_revid = self.target.last_revision_info()
 
120
        self.source.lock_read()
 
121
        try:
 
122
            # This just handles simple cases, but that's good enough for tests
 
123
            my_history = self.target.revision_history()
 
124
            their_history = self.source.revision_history()
 
125
            if their_history[:min(len(my_history), len(their_history))] != my_history:
 
126
                raise errors.DivergedBranches(self.target, self.source)
 
127
            todo = their_history[len(my_history):]
 
128
            revidmap = {}
 
129
            for revid in todo:
 
130
                rev = self.source.repository.get_revision(revid)
 
131
                tree = self.source.repository.revision_tree(revid)
 
132
                def get_file_with_stat(file_id, path=None):
 
133
                    return (tree.get_file(file_id), None)
 
134
                tree.get_file_with_stat = get_file_with_stat
 
135
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
 
136
                    (str(rev.timestamp), str(rev.timezone), 
 
137
                        str(self.target.revno())))
 
138
                parent_revno, parent_revid= self.target.last_revision_info()
 
139
                if parent_revid == NULL_REVISION:
 
140
                    parent_revids = []
 
141
                else:
 
142
                    parent_revids = [parent_revid]
 
143
                builder = self.target.get_commit_builder(parent_revids, 
 
144
                        self.target.get_config(), rev.timestamp,
 
145
                        rev.timezone, rev.committer, rev.properties,
 
146
                        new_revid)
 
147
                try:
 
148
                    for path, ie in tree.inventory.iter_entries():
 
149
                        new_ie = ie.copy()
 
150
                        new_ie.revision = None
 
151
                        builder.record_entry_contents(new_ie, 
 
152
                            [self.target.repository.revision_tree(parent_revid).inventory],
 
153
                            path, tree, 
 
154
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
 
155
                    builder.finish_inventory()
 
156
                except:
 
157
                    builder.abort()
 
158
                    raise
 
159
                revidmap[revid] = builder.commit(rev.message)
 
160
                self.target.set_last_revision_info(parent_revno+1, 
 
161
                    revidmap[revid])
 
162
                trace.mutter('lossily pushed revision %s -> %s', 
 
163
                    revid, revidmap[revid])
 
164
        finally:
 
165
            self.source.unlock()
 
166
        result.new_revno, result.new_revid = self.target.last_revision_info()
 
167
        result.revidmap = revidmap
 
168
        return result
 
169
 
 
170
 
 
171
class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):
 
172
 
 
173
    def get_format_string(self):
 
174
        return "Branch for Testing"
 
175
 
 
176
    def __init__(self):
 
177
        super(DummyForeignVcsBranchFormat, self).__init__()
 
178
        self._matchingbzrdir = DummyForeignVcsDirFormat()
 
179
 
 
180
    def open(self, a_bzrdir, _found=False):
 
181
        if not _found:
 
182
            raise NotImplementedError
 
183
        try:
 
184
            transport = a_bzrdir.get_branch_transport(None)
 
185
            control_files = lockable_files.LockableFiles(transport, 'lock',
 
186
                                                         lockdir.LockDir)
 
187
            return DummyForeignVcsBranch(_format=self,
 
188
                              _control_files=control_files,
 
189
                              a_bzrdir=a_bzrdir,
 
190
                              _repository=a_bzrdir.find_repository())
 
191
        except errors.NoSuchFile:
 
192
            raise errors.NotBranchError(path=transport.base)
 
193
 
 
194
 
 
195
class DummyForeignVcsDirFormat(BzrDirMetaFormat1):
 
196
    """BzrDirFormat for the dummy foreign VCS."""
 
197
 
 
198
    @classmethod
 
199
    def get_format_string(cls):
 
200
        return "A Dummy VCS Dir"
 
201
 
 
202
    @classmethod
 
203
    def get_format_description(cls):
 
204
        return "A Dummy VCS Dir"
 
205
 
 
206
    @classmethod
 
207
    def is_supported(cls):
 
208
        return True
 
209
 
 
210
    def get_branch_format(self):
 
211
        return DummyForeignVcsBranchFormat()
 
212
 
 
213
    @classmethod
 
214
    def probe_transport(klass, transport):
 
215
        """Return the .bzrdir style format present in a directory."""
 
216
        if not transport.has('.dummy'):
 
217
            raise errors.NotBranchError(path=transport.base)
 
218
        return klass()
 
219
 
 
220
    def initialize_on_transport(self, transport):
 
221
        """Initialize a new bzrdir in the base directory of a Transport."""
 
222
        # Since we don't have a .bzr directory, inherit the
 
223
        # mode from the root directory
 
224
        temp_control = lockable_files.LockableFiles(transport,
 
225
                            '', lockable_files.TransportLock)
 
226
        temp_control._transport.mkdir('.dummy',
 
227
                                      # FIXME: RBC 20060121 don't peek under
 
228
                                      # the covers
 
229
                                      mode=temp_control._dir_mode)
 
230
        del temp_control
 
231
        bzrdir_transport = transport.clone('.dummy')
 
232
        # NB: no need to escape relative paths that are url safe.
 
233
        control_files = lockable_files.LockableFiles(bzrdir_transport,
 
234
            self._lock_file_name, self._lock_class)
 
235
        control_files.create_lock()
 
236
        return self.open(transport, _found=True)
 
237
 
 
238
    def _open(self, transport):
 
239
        return DummyForeignVcsDir(transport, self)
 
240
 
 
241
 
 
242
class DummyForeignVcsDir(BzrDirMeta1):
 
243
 
 
244
    def __init__(self, _transport, _format):
 
245
        self._format = _format
 
246
        self.transport = _transport.clone('.dummy')
 
247
        self.root_transport = _transport
 
248
        self._mode_check_done = False
 
249
        self._control_files = lockable_files.LockableFiles(self.transport,
 
250
            "lock", lockable_files.TransportLock)
 
251
 
 
252
    def open_branch(self, ignore_fallbacks=True):
 
253
        return self._format.get_branch_format().open(self, _found=True)
 
254
 
 
255
    def cloning_metadir(self, stacked=False):
 
256
        """Produce a metadir suitable for cloning with."""
 
257
        return format_registry.make_bzrdir("default")
 
258
 
 
259
    def sprout(self, url, revision_id=None, force_new_repo=False,
 
260
               recurse='down', possible_transports=None,
 
261
               accelerator_tree=None, hardlink=False, stacked=False,
 
262
               source_branch=None):
 
263
        # dirstate doesn't cope with accelerator_trees well 
 
264
        # that have a different control dir
 
265
        return super(DummyForeignVcsDir, self).sprout(url=url, 
 
266
                revision_id=revision_id, force_new_repo=force_new_repo, 
 
267
                recurse=recurse, possible_transports=possible_transports, 
 
268
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)
 
269
 
63
270
 
64
271
class ForeignVcsRegistryTests(TestCase):
 
272
    """Tests for the ForeignVcsRegistry class."""
65
273
 
66
274
    def test_parse_revision_id_no_dash(self):
67
275
        reg = foreign.ForeignVcsRegistry()
93
301
        self.assertEquals(mapp, rev.mapping)
94
302
 
95
303
 
96
 
class ShowForeignPropertiesTests(TestCase):
97
 
    """Tests for the show_foreign_properties() function."""
 
304
class WorkingTreeFileUpdateTests(TestCaseWithTransport):
 
305
    """Tests for update_workingtree_fileids()."""
 
306
 
 
307
    def test_update_workingtree(self):
 
308
        wt = self.make_branch_and_tree('br1')
 
309
        self.build_tree_contents([('br1/bla', 'original contents\n')])
 
310
        wt.add('bla', 'bla-a')
 
311
        wt.commit('bla-a')
 
312
        target = wt.bzrdir.sprout('br2').open_workingtree()
 
313
        target.unversion(['bla-a'])
 
314
        target.add('bla', 'bla-b')
 
315
        target.commit('bla-b')
 
316
        target_basis = target.basis_tree()
 
317
        target_basis.lock_read()
 
318
        self.addCleanup(target_basis.unlock)
 
319
        foreign.update_workingtree_fileids(wt, target_basis)
 
320
        wt.lock_read()
 
321
        try:
 
322
            self.assertEquals(["TREE_ROOT", "bla-b"], list(wt.inventory))
 
323
        finally:
 
324
            wt.unlock()
 
325
 
 
326
 
 
327
class DummyForeignVcsTests(TestCaseWithTransport):
 
328
    """Very basic test for DummyForeignVcs."""
98
329
 
99
330
    def setUp(self):
100
 
        super(ShowForeignPropertiesTests, self).setUp()
101
 
        self.vcs = DummyForeignVcs()
102
 
        foreign.foreign_vcs_registry.register("dummy",
103
 
            self.vcs, "Dummy VCS")
104
 
 
105
 
    def tearDown(self):
106
 
        super(ShowForeignPropertiesTests, self).tearDown()
107
 
        foreign.foreign_vcs_registry.remove("dummy")
108
 
 
109
 
    def test_show_non_foreign(self):
110
 
        """Test use with a native (non-foreign) bzr revision."""
111
 
        self.assertEquals({}, foreign.show_foreign_properties(Revision("arevid")))
112
 
 
113
 
    def test_show_imported(self):
114
 
        rev = Revision("dummy-v1:my-foreign-revid")
115
 
        self.assertEquals({ "dummy ding": "my/foreign\\revid" },
116
 
                          foreign.show_foreign_properties(rev))
117
 
 
118
 
    def test_show_direct(self):
119
 
        rev = foreign.ForeignRevision(("some", "foreign", "revid"),
120
 
                                      DummyForeignVcsMapping(self.vcs),
121
 
                                      "roundtrip-revid")
122
 
        self.assertEquals({ "dummy ding": "some/foreign\\revid" },
123
 
                          foreign.show_foreign_properties(rev))
 
331
        BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
 
332
        branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
 
333
        self.addCleanup(self.unregister)
 
334
        super(DummyForeignVcsTests, self).setUp()
 
335
 
 
336
    def unregister(self):
 
337
        try:
 
338
            BzrDirFormat.unregister_control_format(DummyForeignVcsDirFormat)
 
339
        except ValueError:
 
340
            pass
 
341
        branch.InterBranch.unregister_optimiser(InterToDummyVcsBranch)
 
342
 
 
343
    def test_create(self):
 
344
        """Test we can create dummies."""
 
345
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
346
        dir = BzrDir.open("d")
 
347
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
 
348
        dir.open_repository()
 
349
        dir.open_branch()
 
350
        dir.open_workingtree()
 
351
 
 
352
    def test_sprout(self):
 
353
        """Test we can clone dummies and that the format is not preserved."""
 
354
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
 
355
        dir = BzrDir.open("d")
 
356
        newdir = dir.sprout("e")
 
357
        self.assertNotEquals("A Dummy VCS Dir", newdir._format.get_format_string())
 
358
 
 
359
    def test_lossy_push_empty(self):
 
360
        source_tree = self.make_branch_and_tree("source")
 
361
        target_tree = self.make_branch_and_tree("target", 
 
362
            format=DummyForeignVcsDirFormat())
 
363
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
364
        self.assertEquals(NULL_REVISION, pushresult.old_revid)
 
365
        self.assertEquals(NULL_REVISION, pushresult.new_revid)
 
366
        self.assertEquals({}, pushresult.revidmap)
 
367
 
 
368
    def test_lossy_push_simple(self):
 
369
        source_tree = self.make_branch_and_tree("source")
 
370
        self.build_tree(['source/a', 'source/b'])
 
371
        source_tree.add(['a', 'b'])
 
372
        revid1 = source_tree.commit("msg")
 
373
        target_tree = self.make_branch_and_tree("target", 
 
374
            format=DummyForeignVcsDirFormat())
 
375
        target_tree.branch.lock_write()
 
376
        try:
 
377
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
 
378
        finally:
 
379
            target_tree.branch.unlock()
 
380
        self.assertEquals(NULL_REVISION, pushresult.old_revid)
 
381
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
 
382
                           pushresult.revidmap)
 
383
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)