~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

Exclude more files from dumb-rsync upload

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
import os
18
 
from bzrlib.branch import Branch
 
18
 
 
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
19
20
from bzrlib.clone import copy_branch
20
21
from bzrlib.commit import commit
21
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch
22
 
from bzrlib.selftest import TestCaseInTempDir
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
23
27
from bzrlib.trace import mutter
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
24
30
 
 
31
# TODO: Make a branch using basis branch, and check that it 
 
32
# doesn't request any files that could have been avoided, by 
 
33
# hooking into the Transport.
25
34
 
26
35
class TestBranch(TestCaseInTempDir):
27
36
 
42
51
        b2 = Branch.initialize('b2')
43
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
44
53
        b1.add(['foo'], ['foo-id'])
45
 
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
54
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
46
55
 
47
56
        mutter('start fetch')
48
57
        f = Fetcher(from_branch=b1, to_branch=b2)
54
63
        tree = b2.revision_tree('revision-1')
55
64
        eq(tree.get_file_text('foo-id'), 'hello')
56
65
 
57
 
    def test_push_stores(self):
58
 
        """Copy the stores from one branch to another"""
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
 
76
        """Return two branches, a and b, with one file in a."""
59
77
        os.mkdir('a')
60
78
        br_a = Branch.initialize("a")
61
79
        file('a/b', 'wb').write('b')
62
80
        br_a.add('b')
63
 
        commit(br_a, "silly commit")
64
 
 
 
81
        commit(br_a, "silly commit", rev_id='A')
65
82
        os.mkdir('b')
66
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
 
85
 
 
86
    def get_balanced_branch_pair(self):
 
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
67
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
68
97
                          br_a.revision_history()[0])
69
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
70
100
        rev = br_b.get_revision(br_a.revision_history()[0])
71
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
72
102
        for file_id in tree:
76
106
 
77
107
    def test_copy_branch(self):
78
108
        """Copy the stores from one branch to another"""
79
 
        br_a, br_b = self.test_push_stores()
 
109
        br_a, br_b = self.get_balanced_branch_pair()
80
110
        commit(br_b, "silly commit")
81
111
        os.mkdir('c')
82
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
83
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
84
 
        ## # basis branches currently disabled for weave format
85
 
        ## self.assertFalse(br_b.last_revision() in br_c.revision_history())
86
 
        ## br_c.get_revision(br_b.last_revision())
87
114
 
88
115
    def test_copy_partial(self):
89
116
        """Copy only part of the history of a branch."""
90
117
        self.build_tree(['a/', 'a/one'])
91
118
        br_a = Branch.initialize('a')
92
119
        br_a.add(['one'])
93
 
        br_a.commit('commit one', rev_id='u@d-1')
 
120
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
94
121
        self.build_tree(['a/two'])
95
122
        br_a.add(['two'])
96
 
        br_a.commit('commit two', rev_id='u@d-2')
 
123
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
97
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
98
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
99
126
        self.assertTrue(os.path.exists('b/one'))
100
127
        self.assertFalse(os.path.exists('b/two'))
101
128
        
102
 
 
103
129
    def test_record_initial_ghost_merge(self):
104
130
        """A pending merge with no revision present is still a merge."""
105
131
        branch = Branch.initialize('.')
106
 
        branch.add_pending_merge('non:existent@rev--ision--0--2')
107
 
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
132
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
108
134
        rev = branch.get_revision(branch.last_revision())
109
135
        self.assertEqual(len(rev.parent_ids), 1)
110
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
111
137
        self.assertEqual(len(rev.parent_sha1s), 0)
112
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
113
139
 
 
140
    def test_bad_revision(self):
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
143
 
114
144
# TODO 20051003 RBC:
115
145
# compare the gpg-to-sign info for a commit with a ghost and 
116
146
#     an identical tree without a ghost
119
149
    def test_pending_merges(self):
120
150
        """Tracking pending-merged revisions."""
121
151
        b = Branch.initialize('.')
122
 
 
123
 
        self.assertEquals(b.pending_merges(), [])
124
 
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
125
 
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
126
 
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
 
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
 
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
129
 
        self.assertEquals(b.pending_merges(),
 
152
        wt = b.working_tree()
 
153
        self.assertEquals(wt.pending_merges(), [])
 
154
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(wt.pending_merges(),
130
160
                          ['foo@azkhazan-123123-abcabc',
131
161
                           'wibble@fofof--20050401--1928390812'])
132
 
        b.commit("commit from base with two merges")
 
162
        b.working_tree().commit("commit from base with two merges")
133
163
        rev = b.get_revision(b.revision_history()[0])
134
164
        self.assertEquals(len(rev.parent_ids), 2)
135
165
        self.assertEquals(rev.parent_ids[0],
137
167
        self.assertEquals(rev.parent_ids[1],
138
168
                           'wibble@fofof--20050401--1928390812')
139
169
        # list should be cleared when we do a commit
140
 
        self.assertEquals(b.pending_merges(), [])
141
 
 
142
 
 
 
170
        self.assertEquals(wt.pending_merges(), [])
 
171
 
 
172
    def test_sign_existing_revision(self):
 
173
        branch = Branch.initialize('.')
 
174
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
175
        from bzrlib.testament import Testament
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
 
179
 
 
180
    def test_store_signature(self):
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
185
 
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
193
 
 
194
    def test_nicks(self):
 
195
        """Branch nicknames"""
 
196
        os.mkdir('bzr.dev')
 
197
        branch = Branch.initialize('bzr.dev')
 
198
        self.assertEqual(branch.nick, 'bzr.dev')
 
199
        os.rename('bzr.dev', 'bzr.ab')
 
200
        branch = Branch.open('bzr.ab')
 
201
        self.assertEqual(branch.nick, 'bzr.ab')
 
202
        branch.nick = "Aaron's branch"
 
203
        branch.nick = "Aaron's branch"
 
204
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
 
205
        self.assertEqual(branch.nick, "Aaron's branch")
 
206
        os.rename('bzr.ab', 'integration')
 
207
        branch = Branch.open('integration')
 
208
        self.assertEqual(branch.nick, "Aaron's branch")
 
209
        branch.nick = u"\u1234"
 
210
        self.assertEqual(branch.nick, u"\u1234")
 
211
 
 
212
    def test_commit_nicks(self):
 
213
        """Nicknames are committed to the revision"""
 
214
        os.mkdir('bzr.dev')
 
215
        branch = Branch.initialize('bzr.dev')
 
216
        branch.nick = "My happy branch"
 
217
        branch.working_tree().commit('My commit respect da nick.')
 
218
        committed = branch.get_revision(branch.last_revision())
 
219
        self.assertEqual(committed.properties["branch-nick"], 
 
220
                         "My happy branch")
 
221
 
 
222
 
 
223
class TestRemote(TestCaseWithWebserver):
 
224
 
 
225
    def test_open_containing(self):
 
226
        self.assertRaises(NotBranchError, Branch.open_containing,
 
227
                          self.get_remote_url(''))
 
228
        self.assertRaises(NotBranchError, Branch.open_containing,
 
229
                          self.get_remote_url('g/p/q'))
 
230
        b = Branch.initialize('.')
 
231
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
232
        self.assertEqual('', relpath)
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
234
        self.assertEqual('g/p/q', relpath)
 
235
        
143
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
144
237
#         >>> from bzrlib.commit import commit
145
238
#         >>> bzrlib.trace.silent = True
158
251
#         Added 0 revisions.
159
252
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
160
253
#         True
 
254
 
 
255
class InstrumentedTransaction(object):
 
256
 
 
257
    def finish(self):
 
258
        self.calls.append('finish')
 
259
 
 
260
    def __init__(self):
 
261
        self.calls = []
 
262
 
 
263
 
 
264
class TestDecorator(object):
 
265
 
 
266
    def __init__(self):
 
267
        self._calls = []
 
268
 
 
269
    def lock_read(self):
 
270
        self._calls.append('lr')
 
271
 
 
272
    def lock_write(self):
 
273
        self._calls.append('lw')
 
274
 
 
275
    def unlock(self):
 
276
        self._calls.append('ul')
 
277
 
 
278
    @needs_read_lock
 
279
    def do_with_read(self):
 
280
        return 1
 
281
 
 
282
    @needs_read_lock
 
283
    def except_with_read(self):
 
284
        raise RuntimeError
 
285
 
 
286
    @needs_write_lock
 
287
    def do_with_write(self):
 
288
        return 2
 
289
 
 
290
    @needs_write_lock
 
291
    def except_with_write(self):
 
292
        raise RuntimeError
 
293
 
 
294
 
 
295
class TestDecorators(TestCase):
 
296
 
 
297
    def test_needs_read_lock(self):
 
298
        branch = TestDecorator()
 
299
        self.assertEqual(1, branch.do_with_read())
 
300
        self.assertEqual(['lr', 'ul'], branch._calls)
 
301
 
 
302
    def test_excepts_in_read_lock(self):
 
303
        branch = TestDecorator()
 
304
        self.assertRaises(RuntimeError, branch.except_with_read)
 
305
        self.assertEqual(['lr', 'ul'], branch._calls)
 
306
 
 
307
    def test_needs_write_lock(self):
 
308
        branch = TestDecorator()
 
309
        self.assertEqual(2, branch.do_with_write())
 
310
        self.assertEqual(['lw', 'ul'], branch._calls)
 
311
 
 
312
    def test_excepts_in_write_lock(self):
 
313
        branch = TestDecorator()
 
314
        self.assertRaises(RuntimeError, branch.except_with_write)
 
315
        self.assertEqual(['lw', 'ul'], branch._calls)
 
316
 
 
317
 
 
318
class TestBranchTransaction(TestCaseInTempDir):
 
319
 
 
320
    def setUp(self):
 
321
        super(TestBranchTransaction, self).setUp()
 
322
        self.branch = Branch.initialize('.')
 
323
        
 
324
    def test_default_get_transaction(self):
 
325
        """branch.get_transaction on a new branch should give a PassThrough."""
 
326
        self.failUnless(isinstance(self.branch.get_transaction(),
 
327
                                   transactions.PassThroughTransaction))
 
328
 
 
329
    def test__set_new_transaction(self):
 
330
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
331
 
 
332
    def test__set_over_existing_transaction_raises(self):
 
333
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
334
        self.assertRaises(errors.LockError,
 
335
                          self.branch._set_transaction,
 
336
                          transactions.ReadOnlyTransaction())
 
337
 
 
338
    def test_finish_no_transaction_raises(self):
 
339
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
340
 
 
341
    def test_finish_readonly_transaction_works(self):
 
342
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
343
        self.branch._finish_transaction()
 
344
        self.assertEqual(None, self.branch._transaction)
 
345
 
 
346
    def test_unlock_calls_finish(self):
 
347
        self.branch.lock_read()
 
348
        transaction = InstrumentedTransaction()
 
349
        self.branch._transaction = transaction
 
350
        self.branch.unlock()
 
351
        self.assertEqual(['finish'], transaction.calls)
 
352
 
 
353
    def test_lock_read_acquires_ro_transaction(self):
 
354
        self.branch.lock_read()
 
355
        self.failUnless(isinstance(self.branch.get_transaction(),
 
356
                                   transactions.ReadOnlyTransaction))
 
357
        self.branch.unlock()
 
358
        
 
359
    def test_lock_write_acquires_passthrough_transaction(self):
 
360
        self.branch.lock_write()
 
361
        # cannot use get_transaction as its magic
 
362
        self.failUnless(isinstance(self.branch._transaction,
 
363
                                   transactions.PassThroughTransaction))
 
364
        self.branch.unlock()
 
365
 
 
366
 
 
367
class TestBranchPushLocations(TestCaseInTempDir):
 
368
 
 
369
    def setUp(self):
 
370
        super(TestBranchPushLocations, self).setUp()
 
371
        self.branch = Branch.initialize('.')
 
372
        
 
373
    def test_get_push_location_unset(self):
 
374
        self.assertEqual(None, self.branch.get_push_location())
 
375
 
 
376
    def test_get_push_location_exact(self):
 
377
        self.build_tree(['.bazaar/'])
 
378
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
379
                                                       "push_location=foo" %
 
380
                                                       os.getcwdu())
 
381
        self.assertEqual("foo", self.branch.get_push_location())
 
382
 
 
383
    def test_set_push_location(self):
 
384
        self.branch.set_push_location('foo')
 
385
        self.assertFileEqual("[%s]\n"
 
386
                             "push_location = foo" % os.getcwdu(),
 
387
                             '.bazaar/branches.conf')
 
388
 
 
389
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
390
    # recursive section - that is, it appends the branch name.