15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from bzrlib.branch import Branch
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
19
20
from bzrlib.clone import copy_branch
20
21
from bzrlib.commit import commit
21
from bzrlib.errors import NoSuchRevision, UnlistableBranch
22
from bzrlib.selftest import TestCaseInTempDir
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
23
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
29
from bzrlib.revision import NULL_REVISION
31
# TODO: Make a branch using basis branch, and check that it
32
# doesn't request any files that could have been avoided, by
33
# hooking into the Transport.
26
35
class TestBranch(TestCaseInTempDir):
54
63
tree = b2.revision_tree('revision-1')
55
64
eq(tree.get_file_text('foo-id'), 'hello')
57
def test_push_stores(self):
58
"""Copy the stores from one branch to another"""
66
def test_revision_tree(self):
67
b1 = Branch.initialize('.')
68
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
69
tree = b1.revision_tree('revision-1')
70
tree = b1.revision_tree(None)
71
self.assertEqual(len(tree.list_files()), 0)
72
tree = b1.revision_tree(NULL_REVISION)
73
self.assertEqual(len(tree.list_files()), 0)
75
def get_unbalanced_branch_pair(self):
76
"""Return two branches, a and b, with one file in a."""
60
78
br_a = Branch.initialize("a")
61
79
file('a/b', 'wb').write('b')
63
commit(br_a, "silly commit")
81
commit(br_a, "silly commit", rev_id='A')
66
83
br_b = Branch.initialize("b")
86
def get_balanced_branch_pair(self):
87
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
88
br_a, br_b = self.get_unbalanced_branch_pair()
89
br_a.push_stores(br_b)
92
def test_push_stores(self):
93
"""Copy the stores from one branch to another"""
94
br_a, br_b = self.get_unbalanced_branch_pair()
95
# ensure the revision is missing.
67
96
self.assertRaises(NoSuchRevision, br_b.get_revision,
68
97
br_a.revision_history()[0])
69
98
br_a.push_stores(br_b)
99
# check that b now has all the data from a's first commit.
70
100
rev = br_b.get_revision(br_a.revision_history()[0])
71
101
tree = br_b.revision_tree(br_a.revision_history()[0])
72
102
for file_id in tree:
77
107
def test_copy_branch(self):
78
108
"""Copy the stores from one branch to another"""
79
br_a, br_b = self.test_push_stores()
109
br_a, br_b = self.get_balanced_branch_pair()
80
110
commit(br_b, "silly commit")
82
112
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
83
113
self.assertEqual(br_a.revision_history(), br_c.revision_history())
84
## # basis branches currently disabled for weave format
85
## self.assertFalse(br_b.last_revision() in br_c.revision_history())
86
## br_c.get_revision(br_b.last_revision())
88
115
def test_copy_partial(self):
89
116
"""Copy only part of the history of a branch."""
90
117
self.build_tree(['a/', 'a/one'])
91
118
br_a = Branch.initialize('a')
93
br_a.commit('commit one', rev_id='u@d-1')
120
br_a.working_tree().commit('commit one', rev_id='u@d-1')
94
121
self.build_tree(['a/two'])
96
br_a.commit('commit two', rev_id='u@d-2')
123
br_a.working_tree().commit('commit two', rev_id='u@d-2')
97
124
br_b = copy_branch(br_a, 'b', revision='u@d-1')
98
125
self.assertEqual(br_b.last_revision(), 'u@d-1')
99
126
self.assertTrue(os.path.exists('b/one'))
100
127
self.assertFalse(os.path.exists('b/two'))
103
129
def test_record_initial_ghost_merge(self):
104
130
"""A pending merge with no revision present is still a merge."""
105
131
branch = Branch.initialize('.')
106
branch.add_pending_merge('non:existent@rev--ision--0--2')
107
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
132
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
133
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
108
134
rev = branch.get_revision(branch.last_revision())
109
135
self.assertEqual(len(rev.parent_ids), 1)
110
136
# parent_sha1s is not populated now, WTF. rbc 20051003
111
137
self.assertEqual(len(rev.parent_sha1s), 0)
112
138
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
140
def test_bad_revision(self):
141
branch = Branch.initialize('.')
142
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
114
144
# TODO 20051003 RBC:
115
145
# compare the gpg-to-sign info for a commit with a ghost and
116
146
# an identical tree without a ghost
119
149
def test_pending_merges(self):
120
150
"""Tracking pending-merged revisions."""
121
151
b = Branch.initialize('.')
123
self.assertEquals(b.pending_merges(), [])
124
b.add_pending_merge('foo@azkhazan-123123-abcabc')
125
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
126
b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
b.add_pending_merge('wibble@fofof--20050401--1928390812')
129
self.assertEquals(b.pending_merges(),
152
wt = b.working_tree()
153
self.assertEquals(wt.pending_merges(), [])
154
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
155
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
156
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
159
self.assertEquals(wt.pending_merges(),
130
160
['foo@azkhazan-123123-abcabc',
131
161
'wibble@fofof--20050401--1928390812'])
132
b.commit("commit from base with two merges")
162
b.working_tree().commit("commit from base with two merges")
133
163
rev = b.get_revision(b.revision_history()[0])
134
164
self.assertEquals(len(rev.parent_ids), 2)
135
165
self.assertEquals(rev.parent_ids[0],
137
167
self.assertEquals(rev.parent_ids[1],
138
168
'wibble@fofof--20050401--1928390812')
139
169
# list should be cleared when we do a commit
140
self.assertEquals(b.pending_merges(), [])
170
self.assertEquals(wt.pending_merges(), [])
172
def test_sign_existing_revision(self):
173
branch = Branch.initialize('.')
174
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
175
from bzrlib.testament import Testament
176
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
branch.revision_store.get('A', 'sig').read())
180
def test_store_signature(self):
181
branch = Branch.initialize('.')
182
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
184
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
186
def test__relcontrolfilename(self):
187
branch = Branch.initialize('.')
188
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
190
def test__relcontrolfilename_empty(self):
191
branch = Branch.initialize('.')
192
self.assertEqual('.bzr', branch._rel_controlfilename(''))
194
def test_nicks(self):
195
"""Branch nicknames"""
197
branch = Branch.initialize('bzr.dev')
198
self.assertEqual(branch.nick, 'bzr.dev')
199
os.rename('bzr.dev', 'bzr.ab')
200
branch = Branch.open('bzr.ab')
201
self.assertEqual(branch.nick, 'bzr.ab')
202
branch.nick = "Aaron's branch"
203
branch.nick = "Aaron's branch"
204
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
205
self.assertEqual(branch.nick, "Aaron's branch")
206
os.rename('bzr.ab', 'integration')
207
branch = Branch.open('integration')
208
self.assertEqual(branch.nick, "Aaron's branch")
209
branch.nick = u"\u1234"
210
self.assertEqual(branch.nick, u"\u1234")
212
def test_commit_nicks(self):
213
"""Nicknames are committed to the revision"""
215
branch = Branch.initialize('bzr.dev')
216
branch.nick = "My happy branch"
217
branch.working_tree().commit('My commit respect da nick.')
218
committed = branch.get_revision(branch.last_revision())
219
self.assertEqual(committed.properties["branch-nick"],
223
class TestRemote(TestCaseWithWebserver):
225
def test_open_containing(self):
226
self.assertRaises(NotBranchError, Branch.open_containing,
227
self.get_remote_url(''))
228
self.assertRaises(NotBranchError, Branch.open_containing,
229
self.get_remote_url('g/p/q'))
230
b = Branch.initialize('.')
231
branch, relpath = Branch.open_containing(self.get_remote_url(''))
232
self.assertEqual('', relpath)
233
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
234
self.assertEqual('g/p/q', relpath)
143
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output
144
237
# >>> from bzrlib.commit import commit
145
238
# >>> bzrlib.trace.silent = True
158
251
# Added 0 revisions.
159
252
# >>> br1.text_store.total_size() == br2.text_store.total_size()
255
class InstrumentedTransaction(object):
258
self.calls.append('finish')
264
class TestDecorator(object):
270
self._calls.append('lr')
272
def lock_write(self):
273
self._calls.append('lw')
276
self._calls.append('ul')
279
def do_with_read(self):
283
def except_with_read(self):
287
def do_with_write(self):
291
def except_with_write(self):
295
class TestDecorators(TestCase):
297
def test_needs_read_lock(self):
298
branch = TestDecorator()
299
self.assertEqual(1, branch.do_with_read())
300
self.assertEqual(['lr', 'ul'], branch._calls)
302
def test_excepts_in_read_lock(self):
303
branch = TestDecorator()
304
self.assertRaises(RuntimeError, branch.except_with_read)
305
self.assertEqual(['lr', 'ul'], branch._calls)
307
def test_needs_write_lock(self):
308
branch = TestDecorator()
309
self.assertEqual(2, branch.do_with_write())
310
self.assertEqual(['lw', 'ul'], branch._calls)
312
def test_excepts_in_write_lock(self):
313
branch = TestDecorator()
314
self.assertRaises(RuntimeError, branch.except_with_write)
315
self.assertEqual(['lw', 'ul'], branch._calls)
318
class TestBranchTransaction(TestCaseInTempDir):
321
super(TestBranchTransaction, self).setUp()
322
self.branch = Branch.initialize('.')
324
def test_default_get_transaction(self):
325
"""branch.get_transaction on a new branch should give a PassThrough."""
326
self.failUnless(isinstance(self.branch.get_transaction(),
327
transactions.PassThroughTransaction))
329
def test__set_new_transaction(self):
330
self.branch._set_transaction(transactions.ReadOnlyTransaction())
332
def test__set_over_existing_transaction_raises(self):
333
self.branch._set_transaction(transactions.ReadOnlyTransaction())
334
self.assertRaises(errors.LockError,
335
self.branch._set_transaction,
336
transactions.ReadOnlyTransaction())
338
def test_finish_no_transaction_raises(self):
339
self.assertRaises(errors.LockError, self.branch._finish_transaction)
341
def test_finish_readonly_transaction_works(self):
342
self.branch._set_transaction(transactions.ReadOnlyTransaction())
343
self.branch._finish_transaction()
344
self.assertEqual(None, self.branch._transaction)
346
def test_unlock_calls_finish(self):
347
self.branch.lock_read()
348
transaction = InstrumentedTransaction()
349
self.branch._transaction = transaction
351
self.assertEqual(['finish'], transaction.calls)
353
def test_lock_read_acquires_ro_transaction(self):
354
self.branch.lock_read()
355
self.failUnless(isinstance(self.branch.get_transaction(),
356
transactions.ReadOnlyTransaction))
359
def test_lock_write_acquires_passthrough_transaction(self):
360
self.branch.lock_write()
361
# cannot use get_transaction as its magic
362
self.failUnless(isinstance(self.branch._transaction,
363
transactions.PassThroughTransaction))
367
class TestBranchPushLocations(TestCaseInTempDir):
370
super(TestBranchPushLocations, self).setUp()
371
self.branch = Branch.initialize('.')
373
def test_get_push_location_unset(self):
374
self.assertEqual(None, self.branch.get_push_location())
376
def test_get_push_location_exact(self):
377
self.build_tree(['.bazaar/'])
378
print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
379
"push_location=foo" %
381
self.assertEqual("foo", self.branch.get_push_location())
383
def test_set_push_location(self):
384
self.branch.set_push_location('foo')
385
self.assertFileEqual("[%s]\n"
386
"push_location = foo" % os.getcwdu(),
387
'.bazaar/branches.conf')
389
# TODO RBC 20051029 test getting a push location from a branch in a
390
# recursive section - that is, it appends the branch name.