~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_repository_vf/test_repository.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for repository implementations - tests a repository format."""
 
18
 
 
19
from bzrlib import (
 
20
    errors,
 
21
    gpg,
 
22
    inventory,
 
23
    repository as _mod_repository,
 
24
    revision as _mod_revision,
 
25
    tests,
 
26
    versionedfile,
 
27
    vf_repository,
 
28
    )
 
29
 
 
30
from bzrlib.tests.per_repository_vf import (
 
31
    TestCaseWithRepository,
 
32
    all_repository_vf_format_scenarios,
 
33
    )
 
34
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
35
 
 
36
 
 
37
load_tests = load_tests_apply_scenarios
 
38
 
 
39
 
 
40
class TestRepository(TestCaseWithRepository):
 
41
 
 
42
    scenarios = all_repository_vf_format_scenarios()
 
43
 
 
44
    def assertFormatAttribute(self, attribute, allowed_values):
 
45
        """Assert that the format has an attribute 'attribute'."""
 
46
        repo = self.make_repository('repo')
 
47
        self.assertSubset([getattr(repo._format, attribute)], allowed_values)
 
48
 
 
49
    def test_attribute__fetch_order(self):
 
50
        """Test the _fetch_order attribute."""
 
51
        self.assertFormatAttribute('_fetch_order', ('topological', 'unordered'))
 
52
 
 
53
    def test_attribute__fetch_uses_deltas(self):
 
54
        """Test the _fetch_uses_deltas attribute."""
 
55
        self.assertFormatAttribute('_fetch_uses_deltas', (True, False))
 
56
 
 
57
    def test_attribute_inventories_store(self):
 
58
        """Test the existence of the inventories attribute."""
 
59
        tree = self.make_branch_and_tree('tree')
 
60
        repo = tree.branch.repository
 
61
        self.assertIsInstance(repo.inventories, versionedfile.VersionedFiles)
 
62
 
 
63
    def test_attribute_inventories_basics(self):
 
64
        """Test basic aspects of the inventories attribute."""
 
65
        tree = self.make_branch_and_tree('tree')
 
66
        repo = tree.branch.repository
 
67
        rev_id = (tree.commit('a'),)
 
68
        tree.lock_read()
 
69
        self.addCleanup(tree.unlock)
 
70
        self.assertEqual(set([rev_id]), set(repo.inventories.keys()))
 
71
 
 
72
    def test_attribute_revision_store(self):
 
73
        """Test the existence of the revisions attribute."""
 
74
        tree = self.make_branch_and_tree('tree')
 
75
        repo = tree.branch.repository
 
76
        self.assertIsInstance(repo.revisions,
 
77
            versionedfile.VersionedFiles)
 
78
 
 
79
    def test_attribute_revision_store_basics(self):
 
80
        """Test the basic behaviour of the revisions attribute."""
 
81
        tree = self.make_branch_and_tree('tree')
 
82
        repo = tree.branch.repository
 
83
        repo.lock_write()
 
84
        try:
 
85
            self.assertEqual(set(), set(repo.revisions.keys()))
 
86
            revid = (tree.commit("foo"),)
 
87
            self.assertEqual(set([revid]), set(repo.revisions.keys()))
 
88
            self.assertEqual({revid:()},
 
89
                repo.revisions.get_parent_map([revid]))
 
90
        finally:
 
91
            repo.unlock()
 
92
        tree2 = self.make_branch_and_tree('tree2')
 
93
        tree2.pull(tree.branch)
 
94
        left_id = (tree2.commit('left'),)
 
95
        right_id = (tree.commit('right'),)
 
96
        tree.merge_from_branch(tree2.branch)
 
97
        merge_id = (tree.commit('merged'),)
 
98
        repo.lock_read()
 
99
        self.addCleanup(repo.unlock)
 
100
        self.assertEqual(set([revid, left_id, right_id, merge_id]),
 
101
            set(repo.revisions.keys()))
 
102
        self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
 
103
             merge_id:(right_id, left_id)},
 
104
            repo.revisions.get_parent_map(repo.revisions.keys()))
 
105
 
 
106
    def test_attribute_signature_store(self):
 
107
        """Test the existence of the signatures attribute."""
 
108
        tree = self.make_branch_and_tree('tree')
 
109
        repo = tree.branch.repository
 
110
        self.assertIsInstance(repo.signatures,
 
111
            versionedfile.VersionedFiles)
 
112
 
 
113
    def test_exposed_versioned_files_are_marked_dirty(self):
 
114
        repo = self.make_repository('.')
 
115
        repo.lock_write()
 
116
        signatures = repo.signatures
 
117
        revisions = repo.revisions
 
118
        inventories = repo.inventories
 
119
        repo.unlock()
 
120
        self.assertRaises(errors.ObjectNotLocked,
 
121
            signatures.keys)
 
122
        self.assertRaises(errors.ObjectNotLocked,
 
123
            revisions.keys)
 
124
        self.assertRaises(errors.ObjectNotLocked,
 
125
            inventories.keys)
 
126
        self.assertRaises(errors.ObjectNotLocked,
 
127
            signatures.add_lines, ('foo',), [], [])
 
128
        self.assertRaises(errors.ObjectNotLocked,
 
129
            revisions.add_lines, ('foo',), [], [])
 
130
        self.assertRaises(errors.ObjectNotLocked,
 
131
            inventories.add_lines, ('foo',), [], [])
 
132
 
 
133
    def test__get_sink(self):
 
134
        repo = self.make_repository('repo')
 
135
        sink = repo._get_sink()
 
136
        self.assertIsInstance(sink, vf_repository.StreamSink)
 
137
 
 
138
    def test_get_serializer_format(self):
 
139
        repo = self.make_repository('.')
 
140
        format = repo.get_serializer_format()
 
141
        self.assertEqual(repo._serializer.format_num, format)
 
142
 
 
143
    def test_add_revision_inventory_sha1(self):
 
144
        inv = inventory.Inventory(revision_id='A')
 
145
        inv.root.revision = 'A'
 
146
        inv.root.file_id = 'fixed-root'
 
147
        # Insert the inventory on its own to an identical repository, to get
 
148
        # its sha1.
 
149
        reference_repo = self.make_repository('reference_repo')
 
150
        reference_repo.lock_write()
 
151
        reference_repo.start_write_group()
 
152
        inv_sha1 = reference_repo.add_inventory('A', inv, [])
 
153
        reference_repo.abort_write_group()
 
154
        reference_repo.unlock()
 
155
        # Now insert a revision with this inventory, and it should get the same
 
156
        # sha1.
 
157
        repo = self.make_repository('repo')
 
158
        repo.lock_write()
 
159
        repo.start_write_group()
 
160
        root_id = inv.root.file_id
 
161
        repo.texts.add_lines(('fixed-root', 'A'), [], [])
 
162
        repo.add_revision('A', _mod_revision.Revision(
 
163
                'A', committer='B', timestamp=0,
 
164
                timezone=0, message='C'), inv=inv)
 
165
        repo.commit_write_group()
 
166
        repo.unlock()
 
167
        repo.lock_read()
 
168
        self.assertEquals(inv_sha1, repo.get_revision('A').inventory_sha1)
 
169
        repo.unlock()
 
170
 
 
171
    def test_install_revisions(self):
 
172
        wt = self.make_branch_and_tree('source')
 
173
        wt.commit('A', allow_pointless=True, rev_id='A')
 
174
        repo = wt.branch.repository
 
175
        repo.lock_write()
 
176
        repo.start_write_group()
 
177
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
 
178
        repo.commit_write_group()
 
179
        repo.unlock()
 
180
        repo.lock_read()
 
181
        self.addCleanup(repo.unlock)
 
182
        repo2 = self.make_repository('repo2')
 
183
        revision = repo.get_revision('A')
 
184
        tree = repo.revision_tree('A')
 
185
        signature = repo.get_signature_text('A')
 
186
        repo2.lock_write()
 
187
        self.addCleanup(repo2.unlock)
 
188
        vf_repository.install_revisions(repo2, [(revision, tree, signature)])
 
189
        self.assertEqual(revision, repo2.get_revision('A'))
 
190
        self.assertEqual(signature, repo2.get_signature_text('A'))
 
191
 
 
192
    def test_attribute_text_store(self):
 
193
        """Test the existence of the texts attribute."""
 
194
        tree = self.make_branch_and_tree('tree')
 
195
        repo = tree.branch.repository
 
196
        self.assertIsInstance(repo.texts,
 
197
            versionedfile.VersionedFiles)
 
198
 
 
199
    def test_iter_inventories_is_ordered(self):
 
200
        # just a smoke test
 
201
        tree = self.make_branch_and_tree('a')
 
202
        first_revision = tree.commit('')
 
203
        second_revision = tree.commit('')
 
204
        tree.lock_read()
 
205
        self.addCleanup(tree.unlock)
 
206
        revs = (first_revision, second_revision)
 
207
        invs = tree.branch.repository.iter_inventories(revs)
 
208
        for rev_id, inv in zip(revs, invs):
 
209
            self.assertEqual(rev_id, inv.revision_id)
 
210
            self.assertIsInstance(inv, inventory.CommonInventory)
 
211
 
 
212
    def test_item_keys_introduced_by(self):
 
213
        # Make a repo with one revision and one versioned file.
 
214
        tree = self.make_branch_and_tree('t')
 
215
        self.build_tree(['t/foo'])
 
216
        tree.add('foo', 'file1')
 
217
        tree.commit('message', rev_id='rev_id')
 
218
        repo = tree.branch.repository
 
219
        repo.lock_write()
 
220
        repo.start_write_group()
 
221
        try:
 
222
            repo.sign_revision('rev_id', gpg.LoopbackGPGStrategy(None))
 
223
        except errors.UnsupportedOperation:
 
224
            signature_texts = []
 
225
        else:
 
226
            signature_texts = ['rev_id']
 
227
        repo.commit_write_group()
 
228
        repo.unlock()
 
229
        repo.lock_read()
 
230
        self.addCleanup(repo.unlock)
 
231
 
 
232
        # Item keys will be in this order, for maximum convenience for
 
233
        # generating data to insert into knit repository:
 
234
        #   * files
 
235
        #   * inventory
 
236
        #   * signatures
 
237
        #   * revisions
 
238
        expected_item_keys = [
 
239
            ('file', 'file1', ['rev_id']),
 
240
            ('inventory', None, ['rev_id']),
 
241
            ('signatures', None, signature_texts),
 
242
            ('revisions', None, ['rev_id'])]
 
243
        item_keys = list(repo.item_keys_introduced_by(['rev_id']))
 
244
        item_keys = [
 
245
            (kind, file_id, list(versions))
 
246
            for (kind, file_id, versions) in item_keys]
 
247
 
 
248
        if repo.supports_rich_root():
 
249
            # Check for the root versioned file in the item_keys, then remove
 
250
            # it from streamed_names so we can compare that with
 
251
            # expected_record_names.
 
252
            # Note that the file keys can be in any order, so this test is
 
253
            # written to allow that.
 
254
            inv = repo.get_inventory('rev_id')
 
255
            root_item_key = ('file', inv.root.file_id, ['rev_id'])
 
256
            self.assertTrue(root_item_key in item_keys)
 
257
            item_keys.remove(root_item_key)
 
258
 
 
259
        self.assertEqual(expected_item_keys, item_keys)
 
260
 
 
261
 
 
262
class TestCaseWithComplexRepository(TestCaseWithRepository):
 
263
 
 
264
    scenarios = all_repository_vf_format_scenarios()
 
265
 
 
266
    def setUp(self):
 
267
        super(TestCaseWithComplexRepository, self).setUp()
 
268
        tree_a = self.make_branch_and_tree('a')
 
269
        self.bzrdir = tree_a.branch.bzrdir
 
270
        # add a corrupt inventory 'orphan'
 
271
        # this may need some generalising for knits.
 
272
        tree_a.lock_write()
 
273
        try:
 
274
            tree_a.branch.repository.start_write_group()
 
275
            try:
 
276
                inv_file = tree_a.branch.repository.inventories
 
277
                inv_file.add_lines(('orphan',), [], [])
 
278
            except:
 
279
                tree_a.branch.repository.commit_write_group()
 
280
                raise
 
281
            else:
 
282
                tree_a.branch.repository.abort_write_group()
 
283
        finally:
 
284
            tree_a.unlock()
 
285
        # add a real revision 'rev1'
 
286
        tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
 
287
        # add a real revision 'rev2' based on rev1
 
288
        tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
 
289
        # add a reference to a ghost
 
290
        tree_a.add_parent_tree_id('ghost1')
 
291
        try:
 
292
            tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
 
293
        except errors.RevisionNotPresent:
 
294
            raise tests.TestNotApplicable(
 
295
                "Cannot test with ghosts for this format.")
 
296
        # add another reference to a ghost, and a second ghost.
 
297
        tree_a.add_parent_tree_id('ghost1')
 
298
        tree_a.add_parent_tree_id('ghost2')
 
299
        tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
 
300
 
 
301
    def test_revision_trees(self):
 
302
        revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
 
303
        repository = self.bzrdir.open_repository()
 
304
        repository.lock_read()
 
305
        self.addCleanup(repository.unlock)
 
306
        trees1 = list(repository.revision_trees(revision_ids))
 
307
        trees2 = [repository.revision_tree(t) for t in revision_ids]
 
308
        self.assertEqual(len(trees1), len(trees2))
 
309
        for tree1, tree2 in zip(trees1, trees2):
 
310
            self.assertFalse(tree2.changes_from(tree1).has_changed())
 
311
 
 
312
    def test_get_deltas_for_revisions(self):
 
313
        repository = self.bzrdir.open_repository()
 
314
        repository.lock_read()
 
315
        self.addCleanup(repository.unlock)
 
316
        revisions = [repository.get_revision(r) for r in
 
317
                     ['rev1', 'rev2', 'rev3', 'rev4']]
 
318
        deltas1 = list(repository.get_deltas_for_revisions(revisions))
 
319
        deltas2 = [repository.get_revision_delta(r.revision_id) for r in
 
320
                   revisions]
 
321
        self.assertEqual(deltas1, deltas2)
 
322
 
 
323
    def test_all_revision_ids(self):
 
324
        # all_revision_ids -> all revisions
 
325
        self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
 
326
            set(self.bzrdir.open_repository().all_revision_ids()))
 
327
 
 
328
    def test_get_ancestry_missing_revision(self):
 
329
        # get_ancestry(revision that is in some data but not fully installed
 
330
        # -> NoSuchRevision
 
331
        self.assertRaises(errors.NoSuchRevision,
 
332
                          self.bzrdir.open_repository().get_ancestry, 'orphan')
 
333
 
 
334
    def test_get_unordered_ancestry(self):
 
335
        repo = self.bzrdir.open_repository()
 
336
        self.assertEqual(set(repo.get_ancestry('rev3')),
 
337
                         set(repo.get_ancestry('rev3', topo_sorted=False)))
 
338
 
 
339
    def test_reserved_id(self):
 
340
        repo = self.make_repository('repository')
 
341
        repo.lock_write()
 
342
        repo.start_write_group()
 
343
        try:
 
344
            self.assertRaises(errors.ReservedId, repo.add_inventory,
 
345
                'reserved:', None, None)
 
346
            self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
 
347
                "foo", [], 'reserved:', None)
 
348
            self.assertRaises(errors.ReservedId, repo.add_revision,
 
349
                'reserved:', None)
 
350
        finally:
 
351
            repo.abort_write_group()
 
352
            repo.unlock()
 
353
 
 
354
 
 
355
class TestCaseWithCorruptRepository(TestCaseWithRepository):
 
356
 
 
357
    scenarios = all_repository_vf_format_scenarios()
 
358
 
 
359
    def setUp(self):
 
360
        super(TestCaseWithCorruptRepository, self).setUp()
 
361
        # a inventory with no parents and the revision has parents..
 
362
        # i.e. a ghost.
 
363
        repo = self.make_repository('inventory_with_unnecessary_ghost')
 
364
        repo.lock_write()
 
365
        repo.start_write_group()
 
366
        inv = inventory.Inventory(revision_id = 'ghost')
 
367
        inv.root.revision = 'ghost'
 
368
        if repo.supports_rich_root():
 
369
            root_id = inv.root.file_id
 
370
            repo.texts.add_lines((root_id, 'ghost'), [], [])
 
371
        sha1 = repo.add_inventory('ghost', inv, [])
 
372
        rev = _mod_revision.Revision(
 
373
            timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
 
374
            message="Message", inventory_sha1=sha1, revision_id='ghost')
 
375
        rev.parent_ids = ['the_ghost']
 
376
        try:
 
377
            repo.add_revision('ghost', rev)
 
378
        except (errors.NoSuchRevision, errors.RevisionNotPresent):
 
379
            raise tests.TestNotApplicable(
 
380
                "Cannot test with ghosts for this format.")
 
381
 
 
382
        inv = inventory.Inventory(revision_id = 'the_ghost')
 
383
        inv.root.revision = 'the_ghost'
 
384
        if repo.supports_rich_root():
 
385
            root_id = inv.root.file_id
 
386
            repo.texts.add_lines((root_id, 'the_ghost'), [], [])
 
387
        sha1 = repo.add_inventory('the_ghost', inv, [])
 
388
        rev = _mod_revision.Revision(
 
389
            timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
 
390
            message="Message", inventory_sha1=sha1, revision_id='the_ghost')
 
391
        rev.parent_ids = []
 
392
        repo.add_revision('the_ghost', rev)
 
393
        # check its setup usefully
 
394
        inv_weave = repo.inventories
 
395
        possible_parents = (None, (('ghost',),))
 
396
        self.assertSubset(inv_weave.get_parent_map([('ghost',)])[('ghost',)],
 
397
            possible_parents)
 
398
        repo.commit_write_group()
 
399
        repo.unlock()
 
400
 
 
401
    def test_corrupt_revision_access_asserts_if_reported_wrong(self):
 
402
        repo_url = self.get_url('inventory_with_unnecessary_ghost')
 
403
        repo = _mod_repository.Repository.open(repo_url)
 
404
        reported_wrong = False
 
405
        try:
 
406
            if repo.get_ancestry('ghost') != [None, 'the_ghost', 'ghost']:
 
407
                reported_wrong = True
 
408
        except errors.CorruptRepository:
 
409
            # caught the bad data:
 
410
            return
 
411
        if not reported_wrong:
 
412
            return
 
413
        self.assertRaises(errors.CorruptRepository, repo.get_revision, 'ghost')
 
414
 
 
415
    def test_corrupt_revision_get_revision_reconcile(self):
 
416
        repo_url = self.get_url('inventory_with_unnecessary_ghost')
 
417
        repo = _mod_repository.Repository.open(repo_url)
 
418
        repo.get_revision_reconcile('ghost')