~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Martin Pool
  • Date: 2005-09-06 07:26:13 UTC
  • Revision ID: mbp@sourcefrog.net-20050906072613-1a4a18769aaaa3eb
- add xml round-trip test for revisions

- fix up __eq__ method for Revision

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
import os
18
 
 
19
 
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
 
from bzrlib.clone import copy_branch
21
 
from bzrlib.commit import commit
22
 
import bzrlib.errors as errors
23
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
24
 
import bzrlib.gpg
25
 
from bzrlib.tests import TestCase, TestCaseInTempDir
26
 
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
27
 
from bzrlib.trace import mutter
28
 
import bzrlib.transactions as transactions
29
 
from bzrlib.revision import NULL_REVISION
30
 
 
31
 
# TODO: Make a branch using basis branch, and check that it 
32
 
# doesn't request any files that could have been avoided, by 
33
 
# hooking into the Transport.
34
 
 
35
 
class TestBranch(TestCaseInTempDir):
36
 
 
 
17
from bzrlib.selftest import TestCaseInTempDir
 
18
 
 
19
 
 
20
class TestAppendRevisions(TestCaseInTempDir):
 
21
    """Test appending more than one revision"""
37
22
    def test_append_revisions(self):
38
 
        """Test appending more than one revision"""
39
 
        br = Branch.initialize(u".")
 
23
        from bzrlib.branch import Branch
 
24
        br = Branch(".", init=True)
40
25
        br.append_revision("rev1")
41
26
        self.assertEquals(br.revision_history(), ["rev1",])
42
27
        br.append_revision("rev2", "rev3")
43
28
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
44
29
 
45
 
    def test_fetch_revisions(self):
46
 
        """Test fetch-revision operation."""
47
 
        from bzrlib.fetch import Fetcher
48
 
        os.mkdir('b1')
49
 
        os.mkdir('b2')
50
 
        b1 = Branch.initialize('b1')
51
 
        b2 = Branch.initialize('b2')
52
 
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
53
 
        b1.working_tree().add(['foo'], ['foo-id'])
54
 
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
55
 
 
56
 
        mutter('start fetch')
57
 
        f = Fetcher(from_branch=b1, to_branch=b2)
58
 
        eq = self.assertEquals
59
 
        eq(f.count_copied, 1)
60
 
        eq(f.last_revision, 'revision-1')
61
 
 
62
 
        rev = b2.get_revision('revision-1')
63
 
        tree = b2.revision_tree('revision-1')
64
 
        eq(tree.get_file_text('foo-id'), 'hello')
65
 
 
66
 
    def test_revision_tree(self):
67
 
        b1 = Branch.initialize(u'.')
68
 
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
69
 
        tree = b1.revision_tree('revision-1')
70
 
        tree = b1.revision_tree(None)
71
 
        self.assertEqual(len(tree.list_files()), 0)
72
 
        tree = b1.revision_tree(NULL_REVISION)
73
 
        self.assertEqual(len(tree.list_files()), 0)
74
 
 
75
 
    def get_unbalanced_branch_pair(self):
76
 
        """Return two branches, a and b, with one file in a."""
77
 
        os.mkdir('a')
78
 
        br_a = Branch.initialize("a")
79
 
        file('a/b', 'wb').write('b')
80
 
        br_a.working_tree().add('b')
81
 
        commit(br_a, "silly commit", rev_id='A')
82
 
        os.mkdir('b')
83
 
        br_b = Branch.initialize("b")
84
 
        return br_a, br_b
85
 
 
86
 
    def get_balanced_branch_pair(self):
87
 
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
88
 
        br_a, br_b = self.get_unbalanced_branch_pair()
89
 
        br_a.push_stores(br_b)
90
 
        return br_a, br_b
91
 
 
92
 
    def test_push_stores(self):
93
 
        """Copy the stores from one branch to another"""
94
 
        br_a, br_b = self.get_unbalanced_branch_pair()
95
 
        # ensure the revision is missing.
96
 
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
97
 
                          br_a.revision_history()[0])
98
 
        br_a.push_stores(br_b)
99
 
        # check that b now has all the data from a's first commit.
100
 
        rev = br_b.get_revision(br_a.revision_history()[0])
101
 
        tree = br_b.revision_tree(br_a.revision_history()[0])
102
 
        for file_id in tree:
103
 
            if tree.inventory[file_id].kind == "file":
104
 
                tree.get_file(file_id).read()
105
 
        return br_a, br_b
106
 
 
107
 
    def test_copy_branch(self):
108
 
        """Copy the stores from one branch to another"""
109
 
        br_a, br_b = self.get_balanced_branch_pair()
110
 
        commit(br_b, "silly commit")
111
 
        os.mkdir('c')
112
 
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
113
 
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
114
 
 
115
 
    def test_copy_partial(self):
116
 
        """Copy only part of the history of a branch."""
117
 
        self.build_tree(['a/', 'a/one'])
118
 
        br_a = Branch.initialize('a')
119
 
        br_a.working_tree().add(['one'])
120
 
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
121
 
        self.build_tree(['a/two'])
122
 
        br_a.working_tree().add(['two'])
123
 
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
124
 
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
125
 
        self.assertEqual(br_b.last_revision(), 'u@d-1')
126
 
        self.assertTrue(os.path.exists('b/one'))
127
 
        self.assertFalse(os.path.exists('b/two'))
128
 
        
129
 
    def test_record_initial_ghost_merge(self):
130
 
        """A pending merge with no revision present is still a merge."""
131
 
        branch = Branch.initialize(u'.')
132
 
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
133
 
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
134
 
        rev = branch.get_revision(branch.last_revision())
135
 
        self.assertEqual(len(rev.parent_ids), 1)
136
 
        # parent_sha1s is not populated now, WTF. rbc 20051003
137
 
        self.assertEqual(len(rev.parent_sha1s), 0)
138
 
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
139
 
 
140
 
    def test_bad_revision(self):
141
 
        branch = Branch.initialize(u'.')
142
 
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
143
 
 
144
 
# TODO 20051003 RBC:
145
 
# compare the gpg-to-sign info for a commit with a ghost and 
146
 
#     an identical tree without a ghost
147
 
# fetch missing should rewrite the TOC of weaves to list newly available parents.
148
 
        
149
 
    def test_pending_merges(self):
150
 
        """Tracking pending-merged revisions."""
151
 
        b = Branch.initialize(u'.')
152
 
        wt = b.working_tree()
153
 
        self.assertEquals(wt.pending_merges(), [])
154
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
155
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
156
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
 
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
159
 
        self.assertEquals(wt.pending_merges(),
160
 
                          ['foo@azkhazan-123123-abcabc',
161
 
                           'wibble@fofof--20050401--1928390812'])
162
 
        b.working_tree().commit("commit from base with two merges")
163
 
        rev = b.get_revision(b.revision_history()[0])
164
 
        self.assertEquals(len(rev.parent_ids), 2)
165
 
        self.assertEquals(rev.parent_ids[0],
166
 
                          'foo@azkhazan-123123-abcabc')
167
 
        self.assertEquals(rev.parent_ids[1],
168
 
                           'wibble@fofof--20050401--1928390812')
169
 
        # list should be cleared when we do a commit
170
 
        self.assertEquals(wt.pending_merges(), [])
171
 
 
172
 
    def test_sign_existing_revision(self):
173
 
        branch = Branch.initialize(u'.')
174
 
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
175
 
        from bzrlib.testament import Testament
176
 
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
 
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
 
                         branch.revision_store.get('A', 'sig').read())
179
 
 
180
 
    def test_store_signature(self):
181
 
        branch = Branch.initialize(u'.')
182
 
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
183
 
                                        'FOO', 'A')
184
 
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
185
 
 
186
 
    def test__relcontrolfilename(self):
187
 
        branch = Branch.initialize(u'.')
188
 
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
189
 
        
190
 
    def test__relcontrolfilename_empty(self):
191
 
        branch = Branch.initialize(u'.')
192
 
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
193
 
 
194
 
    def test_nicks(self):
195
 
        """Branch nicknames"""
196
 
        os.mkdir('bzr.dev')
197
 
        branch = Branch.initialize('bzr.dev')
198
 
        self.assertEqual(branch.nick, 'bzr.dev')
199
 
        os.rename('bzr.dev', 'bzr.ab')
200
 
        branch = Branch.open('bzr.ab')
201
 
        self.assertEqual(branch.nick, 'bzr.ab')
202
 
        branch.nick = "Aaron's branch"
203
 
        branch.nick = "Aaron's branch"
204
 
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
205
 
        self.assertEqual(branch.nick, "Aaron's branch")
206
 
        os.rename('bzr.ab', 'integration')
207
 
        branch = Branch.open('integration')
208
 
        self.assertEqual(branch.nick, "Aaron's branch")
209
 
        branch.nick = u"\u1234"
210
 
        self.assertEqual(branch.nick, u"\u1234")
211
 
 
212
 
    def test_commit_nicks(self):
213
 
        """Nicknames are committed to the revision"""
214
 
        os.mkdir('bzr.dev')
215
 
        branch = Branch.initialize('bzr.dev')
216
 
        branch.nick = "My happy branch"
217
 
        branch.working_tree().commit('My commit respect da nick.')
218
 
        committed = branch.get_revision(branch.last_revision())
219
 
        self.assertEqual(committed.properties["branch-nick"], 
220
 
                         "My happy branch")
221
 
 
222
 
 
223
 
class TestRemote(TestCaseWithWebserver):
224
 
 
225
 
    def test_open_containing(self):
226
 
        self.assertRaises(NotBranchError, Branch.open_containing,
227
 
                          self.get_remote_url(''))
228
 
        self.assertRaises(NotBranchError, Branch.open_containing,
229
 
                          self.get_remote_url('g/p/q'))
230
 
        b = Branch.initialize(u'.')
231
 
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
232
 
        self.assertEqual('', relpath)
233
 
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
234
 
        self.assertEqual('g/p/q', relpath)
235
 
        
 
30
 
236
31
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
237
32
#         >>> from bzrlib.commit import commit
238
33
#         >>> bzrlib.trace.silent = True
239
34
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
240
 
#         >>> br1.working_tree().add('foo')
241
 
#         >>> br1.working_tree().add('bar')
 
35
#         >>> br1.add('foo')
 
36
#         >>> br1.add('bar')
242
37
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
243
38
#         >>> br2 = ScratchBranch()
244
39
#         >>> br2.update_revisions(br1)
251
46
#         Added 0 revisions.
252
47
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
253
48
#         True
254
 
 
255
 
class InstrumentedTransaction(object):
256
 
 
257
 
    def finish(self):
258
 
        self.calls.append('finish')
259
 
 
260
 
    def __init__(self):
261
 
        self.calls = []
262
 
 
263
 
 
264
 
class TestDecorator(object):
265
 
 
266
 
    def __init__(self):
267
 
        self._calls = []
268
 
 
269
 
    def lock_read(self):
270
 
        self._calls.append('lr')
271
 
 
272
 
    def lock_write(self):
273
 
        self._calls.append('lw')
274
 
 
275
 
    def unlock(self):
276
 
        self._calls.append('ul')
277
 
 
278
 
    @needs_read_lock
279
 
    def do_with_read(self):
280
 
        return 1
281
 
 
282
 
    @needs_read_lock
283
 
    def except_with_read(self):
284
 
        raise RuntimeError
285
 
 
286
 
    @needs_write_lock
287
 
    def do_with_write(self):
288
 
        return 2
289
 
 
290
 
    @needs_write_lock
291
 
    def except_with_write(self):
292
 
        raise RuntimeError
293
 
 
294
 
 
295
 
class TestDecorators(TestCase):
296
 
 
297
 
    def test_needs_read_lock(self):
298
 
        branch = TestDecorator()
299
 
        self.assertEqual(1, branch.do_with_read())
300
 
        self.assertEqual(['lr', 'ul'], branch._calls)
301
 
 
302
 
    def test_excepts_in_read_lock(self):
303
 
        branch = TestDecorator()
304
 
        self.assertRaises(RuntimeError, branch.except_with_read)
305
 
        self.assertEqual(['lr', 'ul'], branch._calls)
306
 
 
307
 
    def test_needs_write_lock(self):
308
 
        branch = TestDecorator()
309
 
        self.assertEqual(2, branch.do_with_write())
310
 
        self.assertEqual(['lw', 'ul'], branch._calls)
311
 
 
312
 
    def test_excepts_in_write_lock(self):
313
 
        branch = TestDecorator()
314
 
        self.assertRaises(RuntimeError, branch.except_with_write)
315
 
        self.assertEqual(['lw', 'ul'], branch._calls)
316
 
 
317
 
 
318
 
class TestBranchTransaction(TestCaseInTempDir):
319
 
 
320
 
    def setUp(self):
321
 
        super(TestBranchTransaction, self).setUp()
322
 
        self.branch = Branch.initialize(u'.')
323
 
        
324
 
    def test_default_get_transaction(self):
325
 
        """branch.get_transaction on a new branch should give a PassThrough."""
326
 
        self.failUnless(isinstance(self.branch.get_transaction(),
327
 
                                   transactions.PassThroughTransaction))
328
 
 
329
 
    def test__set_new_transaction(self):
330
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
331
 
 
332
 
    def test__set_over_existing_transaction_raises(self):
333
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
334
 
        self.assertRaises(errors.LockError,
335
 
                          self.branch._set_transaction,
336
 
                          transactions.ReadOnlyTransaction())
337
 
 
338
 
    def test_finish_no_transaction_raises(self):
339
 
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
340
 
 
341
 
    def test_finish_readonly_transaction_works(self):
342
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
343
 
        self.branch._finish_transaction()
344
 
        self.assertEqual(None, self.branch._transaction)
345
 
 
346
 
    def test_unlock_calls_finish(self):
347
 
        self.branch.lock_read()
348
 
        transaction = InstrumentedTransaction()
349
 
        self.branch._transaction = transaction
350
 
        self.branch.unlock()
351
 
        self.assertEqual(['finish'], transaction.calls)
352
 
 
353
 
    def test_lock_read_acquires_ro_transaction(self):
354
 
        self.branch.lock_read()
355
 
        self.failUnless(isinstance(self.branch.get_transaction(),
356
 
                                   transactions.ReadOnlyTransaction))
357
 
        self.branch.unlock()
358
 
        
359
 
    def test_lock_write_acquires_passthrough_transaction(self):
360
 
        self.branch.lock_write()
361
 
        # cannot use get_transaction as its magic
362
 
        self.failUnless(isinstance(self.branch._transaction,
363
 
                                   transactions.PassThroughTransaction))
364
 
        self.branch.unlock()
365
 
 
366
 
 
367
 
class TestBranchPushLocations(TestCaseInTempDir):
368
 
 
369
 
    def setUp(self):
370
 
        super(TestBranchPushLocations, self).setUp()
371
 
        self.branch = Branch.initialize(u'.')
372
 
        
373
 
    def test_get_push_location_unset(self):
374
 
        self.assertEqual(None, self.branch.get_push_location())
375
 
 
376
 
    def test_get_push_location_exact(self):
377
 
        self.build_tree(['.bazaar/'])
378
 
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
379
 
                                                       "push_location=foo" %
380
 
                                                       os.getcwdu())
381
 
        self.assertEqual("foo", self.branch.get_push_location())
382
 
 
383
 
    def test_set_push_location(self):
384
 
        self.branch.set_push_location('foo')
385
 
        self.assertFileEqual("[%s]\n"
386
 
                             "push_location = foo" % os.getcwdu(),
387
 
                             '.bazaar/branches.conf')
388
 
 
389
 
    # TODO RBC 20051029 test getting a push location from a branch in a 
390
 
    # recursive section - that is, it appends the branch name.