~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bundle.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-11-03 01:53:30 UTC
  • mfrom: (2955.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20071103015330-pt1tec7wyxwwcey8
Fix #158972 don't use timeout for HttpServer

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
from cStringIO import StringIO
18
18
import os
19
19
import sys
 
20
import tempfile
20
21
 
21
22
from bzrlib import (
22
23
    bzrdir,
23
24
    errors,
24
25
    inventory,
25
 
    osutils,
26
26
    repository,
27
27
    revision as _mod_revision,
28
28
    treebuilder,
29
29
    )
30
30
from bzrlib.bzrdir import BzrDir
31
 
from bzrlib.bundle import read_mergeable_from_url
32
31
from bzrlib.bundle.apply_bundle import install_bundle, merge_bundle
33
32
from bzrlib.bundle.bundle_data import BundleTree
34
 
from bzrlib.directory_service import directories
35
33
from bzrlib.bundle.serializer import write_bundle, read_bundle, v09, v4
36
34
from bzrlib.bundle.serializer.v08 import BundleSerializerV08
37
35
from bzrlib.bundle.serializer.v09 import BundleSerializerV09
42
40
                           NoSuchFile,)
43
41
from bzrlib.merge import Merge3Merger
44
42
from bzrlib.repofmt import knitrepo
45
 
from bzrlib.osutils import sha_file, sha_string
46
 
from bzrlib.tests import (
47
 
    SymlinkFeature,
48
 
    TestCase,
49
 
    TestCaseInTempDir,
50
 
    TestCaseWithTransport,
51
 
    TestSkipped,
52
 
    test_read_bundle,
53
 
    test_commit,
54
 
    )
 
43
from bzrlib.osutils import has_symlinks, sha_file
 
44
from bzrlib.tests import (TestCaseInTempDir, TestCaseWithTransport,
 
45
                          TestCase, TestSkipped, test_commit)
55
46
from bzrlib.transform import TreeTransform
56
47
 
57
48
 
169
160
        self.assertEqual(btree.path2id("grandparent/parent"), "b")
170
161
        self.assertEqual(btree.path2id("grandparent/parent/file"), "c")
171
162
 
172
 
        self.assertTrue(btree.path2id("grandparent2") is None)
173
 
        self.assertTrue(btree.path2id("grandparent2/parent") is None)
174
 
        self.assertTrue(btree.path2id("grandparent2/parent/file") is None)
 
163
        assert btree.path2id("grandparent2") is None
 
164
        assert btree.path2id("grandparent2/parent") is None
 
165
        assert btree.path2id("grandparent2/parent/file") is None
175
166
 
176
167
        btree.note_rename("grandparent", "grandparent2")
177
 
        self.assertTrue(btree.old_path("grandparent") is None)
178
 
        self.assertTrue(btree.old_path("grandparent/parent") is None)
179
 
        self.assertTrue(btree.old_path("grandparent/parent/file") is None)
 
168
        assert btree.old_path("grandparent") is None
 
169
        assert btree.old_path("grandparent/parent") is None
 
170
        assert btree.old_path("grandparent/parent/file") is None
180
171
 
181
172
        self.assertEqual(btree.id2path("a"), "grandparent2")
182
173
        self.assertEqual(btree.id2path("b"), "grandparent2/parent")
186
177
        self.assertEqual(btree.path2id("grandparent2/parent"), "b")
187
178
        self.assertEqual(btree.path2id("grandparent2/parent/file"), "c")
188
179
 
189
 
        self.assertTrue(btree.path2id("grandparent") is None)
190
 
        self.assertTrue(btree.path2id("grandparent/parent") is None)
191
 
        self.assertTrue(btree.path2id("grandparent/parent/file") is None)
 
180
        assert btree.path2id("grandparent") is None
 
181
        assert btree.path2id("grandparent/parent") is None
 
182
        assert btree.path2id("grandparent/parent/file") is None
192
183
 
193
184
        btree.note_rename("grandparent/parent", "grandparent2/parent2")
194
185
        self.assertEqual(btree.id2path("a"), "grandparent2")
199
190
        self.assertEqual(btree.path2id("grandparent2/parent2"), "b")
200
191
        self.assertEqual(btree.path2id("grandparent2/parent2/file"), "c")
201
192
 
202
 
        self.assertTrue(btree.path2id("grandparent2/parent") is None)
203
 
        self.assertTrue(btree.path2id("grandparent2/parent/file") is None)
 
193
        assert btree.path2id("grandparent2/parent") is None
 
194
        assert btree.path2id("grandparent2/parent/file") is None
204
195
 
205
196
        btree.note_rename("grandparent/parent/file", 
206
197
                          "grandparent2/parent2/file2")
212
203
        self.assertEqual(btree.path2id("grandparent2/parent2"), "b")
213
204
        self.assertEqual(btree.path2id("grandparent2/parent2/file2"), "c")
214
205
 
215
 
        self.assertTrue(btree.path2id("grandparent2/parent2/file") is None)
 
206
        assert btree.path2id("grandparent2/parent2/file") is None
216
207
 
217
208
    def test_moves(self):
218
209
        """Ensure that file moves have the proper effect on children"""
221
212
                          "grandparent/alt_parent/file")
222
213
        self.assertEqual(btree.id2path("c"), "grandparent/alt_parent/file")
223
214
        self.assertEqual(btree.path2id("grandparent/alt_parent/file"), "c")
224
 
        self.assertTrue(btree.path2id("grandparent/parent/file") is None)
 
215
        assert btree.path2id("grandparent/parent/file") is None
225
216
 
226
217
    def unified_diff(self, old, new):
227
218
        out = StringIO()
233
224
        btree = self.make_tree_1()[0]
234
225
        btree.note_rename("grandparent/parent/file", 
235
226
                          "grandparent/alt_parent/file")
236
 
        self.assertTrue(btree.id2path("e") is None)
237
 
        self.assertTrue(btree.path2id("grandparent/parent/file") is None)
 
227
        assert btree.id2path("e") is None
 
228
        assert btree.path2id("grandparent/parent/file") is None
238
229
        btree.note_id("e", "grandparent/parent/file")
239
230
        return btree
240
231
 
298
289
        btree = self.make_tree_1()[0]
299
290
        self.assertEqual(btree.get_file("c").read(), "Hello\n")
300
291
        btree.note_deletion("grandparent/parent/file")
301
 
        self.assertTrue(btree.id2path("c") is None)
302
 
        self.assertTrue(btree.path2id("grandparent/parent/file") is None)
 
292
        assert btree.id2path("c") is None
 
293
        assert btree.path2id("grandparent/parent/file") is None
303
294
 
304
295
    def sorted_ids(self, tree):
305
296
        ids = list(tree)
455
446
        """
456
447
 
457
448
        if checkout_dir is None:
458
 
            checkout_dir = osutils.mkdtemp(prefix='test-branch-', dir='.')
 
449
            checkout_dir = tempfile.mkdtemp(prefix='test-branch-', dir='.')
459
450
        else:
460
451
            if not os.path.exists(checkout_dir):
461
452
                os.mkdir(checkout_dir)
464
455
        ancestors = write_bundle(self.b1.repository, rev_id, 'null:', s,
465
456
                                 format=self.format)
466
457
        s.seek(0)
467
 
        self.assertIsInstance(s.getvalue(), str)
 
458
        assert isinstance(s.getvalue(), str), (
 
459
            "Bundle isn't a bytestring:\n %s..." % repr(s.getvalue())[:40])
468
460
        install_bundle(tree.branch.repository, read_bundle(s))
469
461
        for ancestor in ancestors:
470
462
            old = self.b1.repository.revision_tree(ancestor)
471
463
            new = tree.branch.repository.revision_tree(ancestor)
472
 
            old.lock_read()
473
 
            new.lock_read()
474
 
            try:
475
 
                # Check that there aren't any inventory level changes
476
 
                delta = new.changes_from(old)
477
 
                self.assertFalse(delta.has_changed(),
478
 
                                 'Revision %s not copied correctly.'
479
 
                                 % (ancestor,))
480
 
 
481
 
                # Now check that the file contents are all correct
482
 
                for inventory_id in old:
483
 
                    try:
484
 
                        old_file = old.get_file(inventory_id)
485
 
                    except NoSuchFile:
486
 
                        continue
487
 
                    if old_file is None:
488
 
                        continue
489
 
                    self.assertEqual(old_file.read(),
490
 
                                     new.get_file(inventory_id).read())
491
 
            finally:
492
 
                new.unlock()
493
 
                old.unlock()
 
464
 
 
465
            # Check that there aren't any inventory level changes
 
466
            delta = new.changes_from(old)
 
467
            self.assertFalse(delta.has_changed(),
 
468
                             'Revision %s not copied correctly.'
 
469
                             % (ancestor,))
 
470
 
 
471
            # Now check that the file contents are all correct
 
472
            for inventory_id in old:
 
473
                try:
 
474
                    old_file = old.get_file(inventory_id)
 
475
                except NoSuchFile:
 
476
                    continue
 
477
                if old_file is None:
 
478
                    continue
 
479
                self.assertEqual(old_file.read(),
 
480
                                 new.get_file(inventory_id).read())
494
481
        if not _mod_revision.is_null(rev_id):
495
482
            rh = self.b1.revision_history()
496
483
            tree.branch.set_revision_history(rh[:rh.index(rev_id)+1])
505
492
        sure everything matches the builtin branch.
506
493
        """
507
494
        to_tree = self.get_checkout(base_rev_id, checkout_dir=checkout_dir)
508
 
        to_tree.lock_write()
509
 
        try:
510
 
            self._valid_apply_bundle(base_rev_id, info, to_tree)
511
 
        finally:
512
 
            to_tree.unlock()
513
 
 
514
 
    def _valid_apply_bundle(self, base_rev_id, info, to_tree):
515
495
        original_parents = to_tree.get_parent_ids()
516
496
        repository = to_tree.branch.repository
517
497
        original_parents = to_tree.get_parent_ids()
661
641
        bundle = self.get_valid_bundle('a@cset-0-6', 'a@cset-0-7')
662
642
 
663
643
    def test_symlink_bundle(self):
664
 
        self.requireFeature(SymlinkFeature)
 
644
        if not has_symlinks():
 
645
            raise TestSkipped("No symlink support")
665
646
        self.tree1 = self.make_branch_and_tree('b1')
666
647
        self.b1 = self.tree1.branch
667
648
        tt = TreeTransform(self.tree1)
1019
1000
        self.assertNotContainsRe(inv_text, 'format="5"')
1020
1001
        self.assertContainsRe(inv_text, 'format="7"')
1021
1002
 
1022
 
    def make_repo_with_installed_revisions(self):
1023
 
        tree = self.make_simple_tree('knit')
1024
 
        tree.commit('hello', rev_id='rev1')
1025
 
        tree.commit('hello', rev_id='rev2')
1026
 
        bundle = read_bundle(self.create_bundle_text('null:', 'rev2')[0])
1027
 
        repo = self.make_repository('repo', format='dirstate-with-subtree')
1028
 
        bundle.install_revisions(repo)
1029
 
        return repo
1030
 
 
1031
1003
    def test_across_models(self):
1032
 
        repo = self.make_repo_with_installed_revisions()
 
1004
        tree = self.make_simple_tree('knit')
 
1005
        tree.commit('hello', rev_id='rev1')
 
1006
        tree.commit('hello', rev_id='rev2')
 
1007
        bundle = read_bundle(self.create_bundle_text('null:', 'rev2')[0])
 
1008
        repo = self.make_repository('repo', format='dirstate-with-subtree')
 
1009
        bundle.install_revisions(repo)
1033
1010
        inv = repo.get_inventory('rev2')
1034
1011
        self.assertEqual('rev2', inv.root.revision)
1035
 
        root_id = inv.root.file_id
1036
 
        repo.lock_read()
1037
 
        self.addCleanup(repo.unlock)
1038
 
        self.assertEqual({(root_id, 'rev1'):(),
1039
 
            (root_id, 'rev2'):((root_id, 'rev1'),)},
1040
 
            repo.texts.get_parent_map([(root_id, 'rev1'), (root_id, 'rev2')]))
1041
 
 
1042
 
    def test_inv_hash_across_serializers(self):
1043
 
        repo = self.make_repo_with_installed_revisions()
1044
 
        recorded_inv_sha1 = repo.get_inventory_sha1('rev2')
1045
 
        xml = repo.get_inventory_xml('rev2')
1046
 
        self.assertEqual(sha_string(xml), recorded_inv_sha1)
 
1012
        root_vf = repo.weave_store.get_weave(inv.root.file_id,
 
1013
                                             repo.get_transaction())
 
1014
        self.assertEqual(root_vf.versions(), ['rev1', 'rev2'])
1047
1015
 
1048
1016
    def test_across_models_incompatible(self):
1049
1017
        tree = self.make_simple_tree('dirstate-with-subtree')
1353
1321
        tree2 = self.make_branch_and_tree('target')
1354
1322
        target_repo = tree2.branch.repository
1355
1323
        install_bundle(target_repo, serializer.read(s))
1356
 
        target_repo.lock_read()
1357
 
        self.addCleanup(target_repo.unlock)
1358
 
        self.assertEqual({'1':'contents1\nstatic\n',
1359
 
            '2':'contents2\nstatic\n'},
1360
 
            dict(target_repo.iter_files_bytes(
1361
 
                [('fileid-2', 'rev1', '1'), ('fileid-2', 'rev2', '2')])))
 
1324
        vf = target_repo.weave_store.get_weave('fileid-2',
 
1325
            target_repo.get_transaction())
 
1326
        self.assertEqual('contents1\nstatic\n', vf.get_text('rev1'))
 
1327
        self.assertEqual('contents2\nstatic\n', vf.get_text('rev2'))
1362
1328
        rtree = target_repo.revision_tree('rev2')
1363
 
        inventory_vf = target_repo.inventories
1364
 
        # If the inventory store has a graph, it must match the revision graph.
1365
 
        self.assertSubset(
1366
 
            [inventory_vf.get_parent_map([('rev2',)])[('rev2',)]],
1367
 
            [None, (('rev1',),)])
 
1329
        inventory_vf = target_repo.get_inventory_weave()
 
1330
        self.assertEqual(['rev1'], inventory_vf.get_parents('rev2'))
1368
1331
        self.assertEqual('changed file',
1369
1332
                         target_repo.get_revision('rev2').message)
1370
1333
 
1584
1547
        self.assertEqual((None, {'foo': 'bar', 'storage_kind': 'header'},
1585
1548
            'info', None, None), record)
1586
1549
        self.assertRaises(BadBundle, record_iter.next)
1587
 
 
1588
 
 
1589
 
class TestReadMergeableFromUrl(TestCaseWithTransport):
1590
 
 
1591
 
    def test_read_mergeable_skips_local(self):
1592
 
        """A local bundle named like the URL should not be read.
1593
 
        """
1594
 
        out, wt = test_read_bundle.create_bundle_file(self)
1595
 
        class FooService(object):
1596
 
            """A directory service that always returns source"""
1597
 
 
1598
 
            def look_up(self, name, url):
1599
 
                return 'source'
1600
 
        directories.register('foo:', FooService, 'Testing directory service')
1601
 
        self.addCleanup(lambda: directories.remove('foo:'))
1602
 
        self.build_tree_contents([('./foo:bar', out.getvalue())])
1603
 
        self.assertRaises(errors.NotABundle, read_mergeable_from_url,
1604
 
                          'foo:bar')