15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from bzrlib.branch import Branch
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
19
21
from bzrlib.clone import copy_branch
20
22
from bzrlib.commit import commit
21
from bzrlib.errors import NoSuchRevision, UnlistableBranch
22
from bzrlib.selftest import TestCaseInTempDir
23
import bzrlib.errors as errors
24
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
26
from bzrlib.osutils import getcwd
27
from bzrlib.tests import TestCase, TestCaseInTempDir
28
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
23
29
from bzrlib.trace import mutter
30
import bzrlib.transactions as transactions
31
from bzrlib.revision import NULL_REVISION
33
# TODO: Make a branch using basis branch, and check that it
34
# doesn't request any files that could have been avoided, by
35
# hooking into the Transport.
26
37
class TestBranch(TestCaseInTempDir):
28
39
def test_append_revisions(self):
29
40
"""Test appending more than one revision"""
30
br = Branch.initialize(".")
41
br = Branch.initialize(u".")
31
42
br.append_revision("rev1")
32
43
self.assertEquals(br.revision_history(), ["rev1",])
33
44
br.append_revision("rev2", "rev3")
54
65
tree = b2.revision_tree('revision-1')
55
66
eq(tree.get_file_text('foo-id'), 'hello')
57
def test_push_stores(self):
58
"""Copy the stores from one branch to another"""
68
def test_revision_tree(self):
69
b1 = Branch.initialize(u'.')
70
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
71
tree = b1.revision_tree('revision-1')
72
tree = b1.revision_tree(None)
73
self.assertEqual(len(tree.list_files()), 0)
74
tree = b1.revision_tree(NULL_REVISION)
75
self.assertEqual(len(tree.list_files()), 0)
77
def get_unbalanced_branch_pair(self):
78
"""Return two branches, a and b, with one file in a."""
60
80
br_a = Branch.initialize("a")
61
81
file('a/b', 'wb').write('b')
63
commit(br_a, "silly commit")
82
br_a.working_tree().add('b')
83
commit(br_a, "silly commit", rev_id='A')
66
85
br_b = Branch.initialize("b")
88
def get_balanced_branch_pair(self):
89
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
90
br_a, br_b = self.get_unbalanced_branch_pair()
91
br_a.push_stores(br_b)
94
def test_push_stores(self):
95
"""Copy the stores from one branch to another"""
96
br_a, br_b = self.get_unbalanced_branch_pair()
97
# ensure the revision is missing.
67
98
self.assertRaises(NoSuchRevision, br_b.get_revision,
68
99
br_a.revision_history()[0])
69
100
br_a.push_stores(br_b)
101
# check that b now has all the data from a's first commit.
70
102
rev = br_b.get_revision(br_a.revision_history()[0])
71
103
tree = br_b.revision_tree(br_a.revision_history()[0])
72
104
for file_id in tree:
77
109
def test_copy_branch(self):
78
110
"""Copy the stores from one branch to another"""
79
br_a, br_b = self.test_push_stores()
111
br_a, br_b = self.get_balanced_branch_pair()
80
112
commit(br_b, "silly commit")
82
114
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
83
115
self.assertEqual(br_a.revision_history(), br_c.revision_history())
84
## # basis branches currently disabled for weave format
85
## self.assertFalse(br_b.last_revision() in br_c.revision_history())
86
## br_c.get_revision(br_b.last_revision())
88
117
def test_copy_partial(self):
89
118
"""Copy only part of the history of a branch."""
90
119
self.build_tree(['a/', 'a/one'])
91
120
br_a = Branch.initialize('a')
93
br_a.commit('commit one', rev_id='u@d-1')
121
br_a.working_tree().add(['one'])
122
br_a.working_tree().commit('commit one', rev_id='u@d-1')
94
123
self.build_tree(['a/two'])
96
br_a.commit('commit two', rev_id='u@d-2')
124
br_a.working_tree().add(['two'])
125
br_a.working_tree().commit('commit two', rev_id='u@d-2')
97
126
br_b = copy_branch(br_a, 'b', revision='u@d-1')
98
127
self.assertEqual(br_b.last_revision(), 'u@d-1')
99
128
self.assertTrue(os.path.exists('b/one'))
100
129
self.assertFalse(os.path.exists('b/two'))
103
131
def test_record_initial_ghost_merge(self):
104
132
"""A pending merge with no revision present is still a merge."""
105
branch = Branch.initialize('.')
106
branch.add_pending_merge('non:existent@rev--ision--0--2')
107
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
133
branch = Branch.initialize(u'.')
134
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
135
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
108
136
rev = branch.get_revision(branch.last_revision())
109
137
self.assertEqual(len(rev.parent_ids), 1)
110
138
# parent_sha1s is not populated now, WTF. rbc 20051003
111
139
self.assertEqual(len(rev.parent_sha1s), 0)
112
140
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
142
def test_bad_revision(self):
143
branch = Branch.initialize(u'.')
144
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
114
146
# TODO 20051003 RBC:
115
147
# compare the gpg-to-sign info for a commit with a ghost and
116
148
# an identical tree without a ghost
119
151
def test_pending_merges(self):
120
152
"""Tracking pending-merged revisions."""
121
b = Branch.initialize('.')
123
self.assertEquals(b.pending_merges(), [])
124
b.add_pending_merge('foo@azkhazan-123123-abcabc')
125
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
126
b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
b.add_pending_merge('wibble@fofof--20050401--1928390812')
129
self.assertEquals(b.pending_merges(),
153
b = Branch.initialize(u'.')
154
wt = b.working_tree()
155
self.assertEquals(wt.pending_merges(), [])
156
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
159
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
160
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
161
self.assertEquals(wt.pending_merges(),
130
162
['foo@azkhazan-123123-abcabc',
131
163
'wibble@fofof--20050401--1928390812'])
132
b.commit("commit from base with two merges")
164
b.working_tree().commit("commit from base with two merges")
133
165
rev = b.get_revision(b.revision_history()[0])
134
166
self.assertEquals(len(rev.parent_ids), 2)
135
167
self.assertEquals(rev.parent_ids[0],
137
169
self.assertEquals(rev.parent_ids[1],
138
170
'wibble@fofof--20050401--1928390812')
139
171
# list should be cleared when we do a commit
140
self.assertEquals(b.pending_merges(), [])
172
self.assertEquals(wt.pending_merges(), [])
174
def test_sign_existing_revision(self):
175
branch = Branch.initialize(u'.')
176
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
177
from bzrlib.testament import Testament
178
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
179
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
180
branch.revision_store.get('A', 'sig').read())
182
def test_store_signature(self):
183
branch = Branch.initialize(u'.')
184
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
186
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
188
def test__relcontrolfilename(self):
189
branch = Branch.initialize(u'.')
190
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
192
def test__relcontrolfilename_empty(self):
193
branch = Branch.initialize(u'.')
194
self.assertEqual('.bzr', branch._rel_controlfilename(''))
196
def test_nicks(self):
197
"""Branch nicknames"""
199
branch = Branch.initialize('bzr.dev')
200
self.assertEqual(branch.nick, 'bzr.dev')
201
os.rename('bzr.dev', 'bzr.ab')
202
branch = Branch.open('bzr.ab')
203
self.assertEqual(branch.nick, 'bzr.ab')
204
branch.nick = "Aaron's branch"
205
branch.nick = "Aaron's branch"
206
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
207
self.assertEqual(branch.nick, "Aaron's branch")
208
os.rename('bzr.ab', 'integration')
209
branch = Branch.open('integration')
210
self.assertEqual(branch.nick, "Aaron's branch")
211
branch.nick = u"\u1234"
212
self.assertEqual(branch.nick, u"\u1234")
214
def test_commit_nicks(self):
215
"""Nicknames are committed to the revision"""
217
branch = Branch.initialize('bzr.dev')
218
branch.nick = "My happy branch"
219
branch.working_tree().commit('My commit respect da nick.')
220
committed = branch.get_revision(branch.last_revision())
221
self.assertEqual(committed.properties["branch-nick"],
225
class TestRemote(TestCaseWithWebserver):
227
def test_open_containing(self):
228
self.assertRaises(NotBranchError, Branch.open_containing,
229
self.get_remote_url(''))
230
self.assertRaises(NotBranchError, Branch.open_containing,
231
self.get_remote_url('g/p/q'))
232
b = Branch.initialize(u'.')
233
branch, relpath = Branch.open_containing(self.get_remote_url(''))
234
self.assertEqual('', relpath)
235
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
236
self.assertEqual('g/p/q', relpath)
143
238
# TODO: rewrite this as a regular unittest, without relying on the displayed output
144
239
# >>> from bzrlib.commit import commit
145
240
# >>> bzrlib.trace.silent = True
146
241
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
242
# >>> br1.working_tree().add('foo')
243
# >>> br1.working_tree().add('bar')
149
244
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
150
245
# >>> br2 = ScratchBranch()
151
246
# >>> br2.update_revisions(br1)
158
253
# Added 0 revisions.
159
254
# >>> br1.text_store.total_size() == br2.text_store.total_size()
257
class InstrumentedTransaction(object):
260
self.calls.append('finish')
266
class TestDecorator(object):
272
self._calls.append('lr')
274
def lock_write(self):
275
self._calls.append('lw')
278
self._calls.append('ul')
281
def do_with_read(self):
285
def except_with_read(self):
289
def do_with_write(self):
293
def except_with_write(self):
297
class TestDecorators(TestCase):
299
def test_needs_read_lock(self):
300
branch = TestDecorator()
301
self.assertEqual(1, branch.do_with_read())
302
self.assertEqual(['lr', 'ul'], branch._calls)
304
def test_excepts_in_read_lock(self):
305
branch = TestDecorator()
306
self.assertRaises(RuntimeError, branch.except_with_read)
307
self.assertEqual(['lr', 'ul'], branch._calls)
309
def test_needs_write_lock(self):
310
branch = TestDecorator()
311
self.assertEqual(2, branch.do_with_write())
312
self.assertEqual(['lw', 'ul'], branch._calls)
314
def test_excepts_in_write_lock(self):
315
branch = TestDecorator()
316
self.assertRaises(RuntimeError, branch.except_with_write)
317
self.assertEqual(['lw', 'ul'], branch._calls)
320
class TestBranchTransaction(TestCaseInTempDir):
323
super(TestBranchTransaction, self).setUp()
324
self.branch = Branch.initialize(u'.')
326
def test_default_get_transaction(self):
327
"""branch.get_transaction on a new branch should give a PassThrough."""
328
self.failUnless(isinstance(self.branch.get_transaction(),
329
transactions.PassThroughTransaction))
331
def test__set_new_transaction(self):
332
self.branch._set_transaction(transactions.ReadOnlyTransaction())
334
def test__set_over_existing_transaction_raises(self):
335
self.branch._set_transaction(transactions.ReadOnlyTransaction())
336
self.assertRaises(errors.LockError,
337
self.branch._set_transaction,
338
transactions.ReadOnlyTransaction())
340
def test_finish_no_transaction_raises(self):
341
self.assertRaises(errors.LockError, self.branch._finish_transaction)
343
def test_finish_readonly_transaction_works(self):
344
self.branch._set_transaction(transactions.ReadOnlyTransaction())
345
self.branch._finish_transaction()
346
self.assertEqual(None, self.branch._transaction)
348
def test_unlock_calls_finish(self):
349
self.branch.lock_read()
350
transaction = InstrumentedTransaction()
351
self.branch._transaction = transaction
353
self.assertEqual(['finish'], transaction.calls)
355
def test_lock_read_acquires_ro_transaction(self):
356
self.branch.lock_read()
357
self.failUnless(isinstance(self.branch.get_transaction(),
358
transactions.ReadOnlyTransaction))
361
def test_lock_write_acquires_passthrough_transaction(self):
362
self.branch.lock_write()
363
# cannot use get_transaction as its magic
364
self.failUnless(isinstance(self.branch._transaction,
365
transactions.PassThroughTransaction))
369
class TestBranchPushLocations(TestCaseInTempDir):
372
super(TestBranchPushLocations, self).setUp()
373
self.branch = Branch.initialize(u'.')
375
def test_get_push_location_unset(self):
376
self.assertEqual(None, self.branch.get_push_location())
378
def test_get_push_location_exact(self):
379
from bzrlib.config import (branches_config_filename,
380
ensure_config_dir_exists)
381
ensure_config_dir_exists()
382
fn = branches_config_filename()
383
print >> open(fn, 'wt'), ("[%s]\n"
384
"push_location=foo" %
386
self.assertEqual("foo", self.branch.get_push_location())
388
def test_set_push_location(self):
389
from bzrlib.config import (branches_config_filename,
390
ensure_config_dir_exists)
391
ensure_config_dir_exists()
392
fn = branches_config_filename()
393
self.branch.set_push_location('foo')
394
self.assertFileEqual("[%s]\n"
395
"push_location = foo" % getcwd(),
398
# TODO RBC 20051029 test getting a push location from a branch in a
399
# recursive section - that is, it appends the branch name.