~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

[merge] jam-integration 1495

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
import os
18
 
from bzrlib.branch import Branch
 
18
import sys
 
19
 
 
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
19
21
from bzrlib.clone import copy_branch
20
22
from bzrlib.commit import commit
21
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch
22
 
from bzrlib.selftest import TestCaseInTempDir
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
25
import bzrlib.gpg
 
26
from bzrlib.osutils import getcwd
 
27
from bzrlib.tests import TestCase, TestCaseInTempDir
 
28
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
23
29
from bzrlib.trace import mutter
 
30
import bzrlib.transactions as transactions
 
31
from bzrlib.revision import NULL_REVISION
24
32
 
 
33
# TODO: Make a branch using basis branch, and check that it 
 
34
# doesn't request any files that could have been avoided, by 
 
35
# hooking into the Transport.
25
36
 
26
37
class TestBranch(TestCaseInTempDir):
27
38
 
28
39
    def test_append_revisions(self):
29
40
        """Test appending more than one revision"""
30
 
        br = Branch.initialize(".")
 
41
        br = Branch.initialize(u".")
31
42
        br.append_revision("rev1")
32
43
        self.assertEquals(br.revision_history(), ["rev1",])
33
44
        br.append_revision("rev2", "rev3")
40
51
        os.mkdir('b2')
41
52
        b1 = Branch.initialize('b1')
42
53
        b2 = Branch.initialize('b2')
43
 
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
44
 
        b1.add(['foo'], ['foo-id'])
45
 
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
54
        file('b1/foo', 'w').write('hello')
 
55
        b1.working_tree().add(['foo'], ['foo-id'])
 
56
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
46
57
 
47
58
        mutter('start fetch')
48
59
        f = Fetcher(from_branch=b1, to_branch=b2)
54
65
        tree = b2.revision_tree('revision-1')
55
66
        eq(tree.get_file_text('foo-id'), 'hello')
56
67
 
57
 
    def test_push_stores(self):
58
 
        """Copy the stores from one branch to another"""
 
68
    def test_revision_tree(self):
 
69
        b1 = Branch.initialize(u'.')
 
70
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
71
        tree = b1.revision_tree('revision-1')
 
72
        tree = b1.revision_tree(None)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
        tree = b1.revision_tree(NULL_REVISION)
 
75
        self.assertEqual(len(tree.list_files()), 0)
 
76
 
 
77
    def get_unbalanced_branch_pair(self):
 
78
        """Return two branches, a and b, with one file in a."""
59
79
        os.mkdir('a')
60
80
        br_a = Branch.initialize("a")
61
81
        file('a/b', 'wb').write('b')
62
 
        br_a.add('b')
63
 
        commit(br_a, "silly commit")
64
 
 
 
82
        br_a.working_tree().add('b')
 
83
        commit(br_a, "silly commit", rev_id='A')
65
84
        os.mkdir('b')
66
85
        br_b = Branch.initialize("b")
 
86
        return br_a, br_b
 
87
 
 
88
    def get_balanced_branch_pair(self):
 
89
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
90
        br_a, br_b = self.get_unbalanced_branch_pair()
 
91
        br_a.push_stores(br_b)
 
92
        return br_a, br_b
 
93
 
 
94
    def test_push_stores(self):
 
95
        """Copy the stores from one branch to another"""
 
96
        br_a, br_b = self.get_unbalanced_branch_pair()
 
97
        # ensure the revision is missing.
67
98
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
68
99
                          br_a.revision_history()[0])
69
100
        br_a.push_stores(br_b)
 
101
        # check that b now has all the data from a's first commit.
70
102
        rev = br_b.get_revision(br_a.revision_history()[0])
71
103
        tree = br_b.revision_tree(br_a.revision_history()[0])
72
104
        for file_id in tree:
76
108
 
77
109
    def test_copy_branch(self):
78
110
        """Copy the stores from one branch to another"""
79
 
        br_a, br_b = self.test_push_stores()
 
111
        br_a, br_b = self.get_balanced_branch_pair()
80
112
        commit(br_b, "silly commit")
81
113
        os.mkdir('c')
82
114
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
83
115
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
84
 
        ## # basis branches currently disabled for weave format
85
 
        ## self.assertFalse(br_b.last_revision() in br_c.revision_history())
86
 
        ## br_c.get_revision(br_b.last_revision())
87
116
 
88
117
    def test_copy_partial(self):
89
118
        """Copy only part of the history of a branch."""
90
119
        self.build_tree(['a/', 'a/one'])
91
120
        br_a = Branch.initialize('a')
92
 
        br_a.add(['one'])
93
 
        br_a.commit('commit one', rev_id='u@d-1')
 
121
        br_a.working_tree().add(['one'])
 
122
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
94
123
        self.build_tree(['a/two'])
95
 
        br_a.add(['two'])
96
 
        br_a.commit('commit two', rev_id='u@d-2')
 
124
        br_a.working_tree().add(['two'])
 
125
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
97
126
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
98
127
        self.assertEqual(br_b.last_revision(), 'u@d-1')
99
128
        self.assertTrue(os.path.exists('b/one'))
100
129
        self.assertFalse(os.path.exists('b/two'))
101
130
        
102
 
 
103
131
    def test_record_initial_ghost_merge(self):
104
132
        """A pending merge with no revision present is still a merge."""
105
 
        branch = Branch.initialize('.')
106
 
        branch.add_pending_merge('non:existent@rev--ision--0--2')
107
 
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
133
        branch = Branch.initialize(u'.')
 
134
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
135
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
108
136
        rev = branch.get_revision(branch.last_revision())
109
137
        self.assertEqual(len(rev.parent_ids), 1)
110
138
        # parent_sha1s is not populated now, WTF. rbc 20051003
111
139
        self.assertEqual(len(rev.parent_sha1s), 0)
112
140
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
113
141
 
 
142
    def test_bad_revision(self):
 
143
        branch = Branch.initialize(u'.')
 
144
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
145
 
114
146
# TODO 20051003 RBC:
115
147
# compare the gpg-to-sign info for a commit with a ghost and 
116
148
#     an identical tree without a ghost
118
150
        
119
151
    def test_pending_merges(self):
120
152
        """Tracking pending-merged revisions."""
121
 
        b = Branch.initialize('.')
122
 
 
123
 
        self.assertEquals(b.pending_merges(), [])
124
 
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
125
 
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
126
 
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
 
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
 
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
129
 
        self.assertEquals(b.pending_merges(),
 
153
        b = Branch.initialize(u'.')
 
154
        wt = b.working_tree()
 
155
        self.assertEquals(wt.pending_merges(), [])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
159
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
160
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
161
        self.assertEquals(wt.pending_merges(),
130
162
                          ['foo@azkhazan-123123-abcabc',
131
163
                           'wibble@fofof--20050401--1928390812'])
132
 
        b.commit("commit from base with two merges")
 
164
        b.working_tree().commit("commit from base with two merges")
133
165
        rev = b.get_revision(b.revision_history()[0])
134
166
        self.assertEquals(len(rev.parent_ids), 2)
135
167
        self.assertEquals(rev.parent_ids[0],
137
169
        self.assertEquals(rev.parent_ids[1],
138
170
                           'wibble@fofof--20050401--1928390812')
139
171
        # list should be cleared when we do a commit
140
 
        self.assertEquals(b.pending_merges(), [])
141
 
 
142
 
 
 
172
        self.assertEquals(wt.pending_merges(), [])
 
173
 
 
174
    def test_sign_existing_revision(self):
 
175
        branch = Branch.initialize(u'.')
 
176
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
177
        from bzrlib.testament import Testament
 
178
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
179
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
180
                         branch.revision_store.get('A', 'sig').read())
 
181
 
 
182
    def test_store_signature(self):
 
183
        branch = Branch.initialize(u'.')
 
184
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
185
                                        'FOO', 'A')
 
186
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
187
 
 
188
    def test__relcontrolfilename(self):
 
189
        branch = Branch.initialize(u'.')
 
190
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
191
        
 
192
    def test__relcontrolfilename_empty(self):
 
193
        branch = Branch.initialize(u'.')
 
194
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
195
 
 
196
    def test_nicks(self):
 
197
        """Branch nicknames"""
 
198
        os.mkdir('bzr.dev')
 
199
        branch = Branch.initialize('bzr.dev')
 
200
        self.assertEqual(branch.nick, 'bzr.dev')
 
201
        os.rename('bzr.dev', 'bzr.ab')
 
202
        branch = Branch.open('bzr.ab')
 
203
        self.assertEqual(branch.nick, 'bzr.ab')
 
204
        branch.nick = "Aaron's branch"
 
205
        branch.nick = "Aaron's branch"
 
206
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
207
        self.assertEqual(branch.nick, "Aaron's branch")
 
208
        os.rename('bzr.ab', 'integration')
 
209
        branch = Branch.open('integration')
 
210
        self.assertEqual(branch.nick, "Aaron's branch")
 
211
        branch.nick = u"\u1234"
 
212
        self.assertEqual(branch.nick, u"\u1234")
 
213
 
 
214
    def test_commit_nicks(self):
 
215
        """Nicknames are committed to the revision"""
 
216
        os.mkdir('bzr.dev')
 
217
        branch = Branch.initialize('bzr.dev')
 
218
        branch.nick = "My happy branch"
 
219
        branch.working_tree().commit('My commit respect da nick.')
 
220
        committed = branch.get_revision(branch.last_revision())
 
221
        self.assertEqual(committed.properties["branch-nick"], 
 
222
                         "My happy branch")
 
223
 
 
224
 
 
225
class TestRemote(TestCaseWithWebserver):
 
226
 
 
227
    def test_open_containing(self):
 
228
        self.assertRaises(NotBranchError, Branch.open_containing,
 
229
                          self.get_remote_url(''))
 
230
        self.assertRaises(NotBranchError, Branch.open_containing,
 
231
                          self.get_remote_url('g/p/q'))
 
232
        b = Branch.initialize(u'.')
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
234
        self.assertEqual('', relpath)
 
235
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
236
        self.assertEqual('g/p/q', relpath)
 
237
        
143
238
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
144
239
#         >>> from bzrlib.commit import commit
145
240
#         >>> bzrlib.trace.silent = True
146
241
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
147
 
#         >>> br1.add('foo')
148
 
#         >>> br1.add('bar')
 
242
#         >>> br1.working_tree().add('foo')
 
243
#         >>> br1.working_tree().add('bar')
149
244
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
150
245
#         >>> br2 = ScratchBranch()
151
246
#         >>> br2.update_revisions(br1)
158
253
#         Added 0 revisions.
159
254
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
160
255
#         True
 
256
 
 
257
class InstrumentedTransaction(object):
 
258
 
 
259
    def finish(self):
 
260
        self.calls.append('finish')
 
261
 
 
262
    def __init__(self):
 
263
        self.calls = []
 
264
 
 
265
 
 
266
class TestDecorator(object):
 
267
 
 
268
    def __init__(self):
 
269
        self._calls = []
 
270
 
 
271
    def lock_read(self):
 
272
        self._calls.append('lr')
 
273
 
 
274
    def lock_write(self):
 
275
        self._calls.append('lw')
 
276
 
 
277
    def unlock(self):
 
278
        self._calls.append('ul')
 
279
 
 
280
    @needs_read_lock
 
281
    def do_with_read(self):
 
282
        return 1
 
283
 
 
284
    @needs_read_lock
 
285
    def except_with_read(self):
 
286
        raise RuntimeError
 
287
 
 
288
    @needs_write_lock
 
289
    def do_with_write(self):
 
290
        return 2
 
291
 
 
292
    @needs_write_lock
 
293
    def except_with_write(self):
 
294
        raise RuntimeError
 
295
 
 
296
 
 
297
class TestDecorators(TestCase):
 
298
 
 
299
    def test_needs_read_lock(self):
 
300
        branch = TestDecorator()
 
301
        self.assertEqual(1, branch.do_with_read())
 
302
        self.assertEqual(['lr', 'ul'], branch._calls)
 
303
 
 
304
    def test_excepts_in_read_lock(self):
 
305
        branch = TestDecorator()
 
306
        self.assertRaises(RuntimeError, branch.except_with_read)
 
307
        self.assertEqual(['lr', 'ul'], branch._calls)
 
308
 
 
309
    def test_needs_write_lock(self):
 
310
        branch = TestDecorator()
 
311
        self.assertEqual(2, branch.do_with_write())
 
312
        self.assertEqual(['lw', 'ul'], branch._calls)
 
313
 
 
314
    def test_excepts_in_write_lock(self):
 
315
        branch = TestDecorator()
 
316
        self.assertRaises(RuntimeError, branch.except_with_write)
 
317
        self.assertEqual(['lw', 'ul'], branch._calls)
 
318
 
 
319
 
 
320
class TestBranchTransaction(TestCaseInTempDir):
 
321
 
 
322
    def setUp(self):
 
323
        super(TestBranchTransaction, self).setUp()
 
324
        self.branch = Branch.initialize(u'.')
 
325
        
 
326
    def test_default_get_transaction(self):
 
327
        """branch.get_transaction on a new branch should give a PassThrough."""
 
328
        self.failUnless(isinstance(self.branch.get_transaction(),
 
329
                                   transactions.PassThroughTransaction))
 
330
 
 
331
    def test__set_new_transaction(self):
 
332
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
333
 
 
334
    def test__set_over_existing_transaction_raises(self):
 
335
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
336
        self.assertRaises(errors.LockError,
 
337
                          self.branch._set_transaction,
 
338
                          transactions.ReadOnlyTransaction())
 
339
 
 
340
    def test_finish_no_transaction_raises(self):
 
341
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
342
 
 
343
    def test_finish_readonly_transaction_works(self):
 
344
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
345
        self.branch._finish_transaction()
 
346
        self.assertEqual(None, self.branch._transaction)
 
347
 
 
348
    def test_unlock_calls_finish(self):
 
349
        self.branch.lock_read()
 
350
        transaction = InstrumentedTransaction()
 
351
        self.branch._transaction = transaction
 
352
        self.branch.unlock()
 
353
        self.assertEqual(['finish'], transaction.calls)
 
354
 
 
355
    def test_lock_read_acquires_ro_transaction(self):
 
356
        self.branch.lock_read()
 
357
        self.failUnless(isinstance(self.branch.get_transaction(),
 
358
                                   transactions.ReadOnlyTransaction))
 
359
        self.branch.unlock()
 
360
        
 
361
    def test_lock_write_acquires_passthrough_transaction(self):
 
362
        self.branch.lock_write()
 
363
        # cannot use get_transaction as its magic
 
364
        self.failUnless(isinstance(self.branch._transaction,
 
365
                                   transactions.PassThroughTransaction))
 
366
        self.branch.unlock()
 
367
 
 
368
 
 
369
class TestBranchPushLocations(TestCaseInTempDir):
 
370
 
 
371
    def setUp(self):
 
372
        super(TestBranchPushLocations, self).setUp()
 
373
        self.branch = Branch.initialize(u'.')
 
374
        
 
375
    def test_get_push_location_unset(self):
 
376
        self.assertEqual(None, self.branch.get_push_location())
 
377
 
 
378
    def test_get_push_location_exact(self):
 
379
        from bzrlib.config import (branches_config_filename,
 
380
                                   ensure_config_dir_exists)
 
381
        ensure_config_dir_exists()
 
382
        fn = branches_config_filename()
 
383
        print >> open(fn, 'wt'), ("[%s]\n"
 
384
                                  "push_location=foo" %
 
385
                                  getcwd())
 
386
        self.assertEqual("foo", self.branch.get_push_location())
 
387
 
 
388
    def test_set_push_location(self):
 
389
        from bzrlib.config import (branches_config_filename,
 
390
                                   ensure_config_dir_exists)
 
391
        ensure_config_dir_exists()
 
392
        fn = branches_config_filename()
 
393
        self.branch.set_push_location('foo')
 
394
        self.assertFileEqual("[%s]\n"
 
395
                             "push_location = foo" % getcwd(),
 
396
                             fn)
 
397
 
 
398
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
399
    # recursive section - that is, it appends the branch name.