1
# (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
21
from bzrlib.clone import copy_branch
22
from bzrlib.commit import commit
23
import bzrlib.errors as errors
24
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
26
from bzrlib.osutils import getcwd
27
from bzrlib.tests import TestCase, TestCaseInTempDir
28
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
29
from bzrlib.trace import mutter
30
import bzrlib.transactions as transactions
31
from bzrlib.revision import NULL_REVISION
33
# TODO: Make a branch using basis branch, and check that it
34
# doesn't request any files that could have been avoided, by
35
# hooking into the Transport.
37
class TestBranch(TestCaseInTempDir):
39
def test_append_revisions(self):
40
"""Test appending more than one revision"""
41
br = Branch.initialize(u".")
42
br.append_revision("rev1")
43
self.assertEquals(br.revision_history(), ["rev1",])
44
br.append_revision("rev2", "rev3")
45
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
47
def test_fetch_revisions(self):
48
"""Test fetch-revision operation."""
49
from bzrlib.fetch import Fetcher
52
b1 = Branch.initialize('b1')
53
b2 = Branch.initialize('b2')
54
file('b1/foo', 'w').write('hello')
55
b1.working_tree().add(['foo'], ['foo-id'])
56
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
59
f = Fetcher(from_branch=b1, to_branch=b2)
60
eq = self.assertEquals
62
eq(f.last_revision, 'revision-1')
64
rev = b2.get_revision('revision-1')
65
tree = b2.revision_tree('revision-1')
66
eq(tree.get_file_text('foo-id'), 'hello')
68
def test_revision_tree(self):
69
b1 = Branch.initialize(u'.')
70
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
71
tree = b1.revision_tree('revision-1')
72
tree = b1.revision_tree(None)
73
self.assertEqual(len(tree.list_files()), 0)
74
tree = b1.revision_tree(NULL_REVISION)
75
self.assertEqual(len(tree.list_files()), 0)
77
def get_unbalanced_branch_pair(self):
78
"""Return two branches, a and b, with one file in a."""
80
br_a = Branch.initialize("a")
81
file('a/b', 'wb').write('b')
82
br_a.working_tree().add('b')
83
commit(br_a, "silly commit", rev_id='A')
85
br_b = Branch.initialize("b")
88
def get_balanced_branch_pair(self):
89
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
90
br_a, br_b = self.get_unbalanced_branch_pair()
91
br_a.push_stores(br_b)
94
def test_push_stores(self):
95
"""Copy the stores from one branch to another"""
96
br_a, br_b = self.get_unbalanced_branch_pair()
97
# ensure the revision is missing.
98
self.assertRaises(NoSuchRevision, br_b.get_revision,
99
br_a.revision_history()[0])
100
br_a.push_stores(br_b)
101
# check that b now has all the data from a's first commit.
102
rev = br_b.get_revision(br_a.revision_history()[0])
103
tree = br_b.revision_tree(br_a.revision_history()[0])
105
if tree.inventory[file_id].kind == "file":
106
tree.get_file(file_id).read()
109
def test_copy_branch(self):
110
"""Copy the stores from one branch to another"""
111
br_a, br_b = self.get_balanced_branch_pair()
112
commit(br_b, "silly commit")
114
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
115
self.assertEqual(br_a.revision_history(), br_c.revision_history())
117
def test_copy_partial(self):
118
"""Copy only part of the history of a branch."""
119
self.build_tree(['a/', 'a/one'])
120
br_a = Branch.initialize('a')
121
br_a.working_tree().add(['one'])
122
br_a.working_tree().commit('commit one', rev_id='u@d-1')
123
self.build_tree(['a/two'])
124
br_a.working_tree().add(['two'])
125
br_a.working_tree().commit('commit two', rev_id='u@d-2')
126
br_b = copy_branch(br_a, 'b', revision='u@d-1')
127
self.assertEqual(br_b.last_revision(), 'u@d-1')
128
self.assertTrue(os.path.exists('b/one'))
129
self.assertFalse(os.path.exists('b/two'))
131
def test_record_initial_ghost_merge(self):
132
"""A pending merge with no revision present is still a merge."""
133
branch = Branch.initialize(u'.')
134
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
135
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
136
rev = branch.get_revision(branch.last_revision())
137
self.assertEqual(len(rev.parent_ids), 1)
138
# parent_sha1s is not populated now, WTF. rbc 20051003
139
self.assertEqual(len(rev.parent_sha1s), 0)
140
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
142
def test_bad_revision(self):
143
branch = Branch.initialize(u'.')
144
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
147
# compare the gpg-to-sign info for a commit with a ghost and
148
# an identical tree without a ghost
149
# fetch missing should rewrite the TOC of weaves to list newly available parents.
151
def test_pending_merges(self):
152
"""Tracking pending-merged revisions."""
153
b = Branch.initialize(u'.')
154
wt = b.working_tree()
155
self.assertEquals(wt.pending_merges(), [])
156
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
159
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
160
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
161
self.assertEquals(wt.pending_merges(),
162
['foo@azkhazan-123123-abcabc',
163
'wibble@fofof--20050401--1928390812'])
164
b.working_tree().commit("commit from base with two merges")
165
rev = b.get_revision(b.revision_history()[0])
166
self.assertEquals(len(rev.parent_ids), 2)
167
self.assertEquals(rev.parent_ids[0],
168
'foo@azkhazan-123123-abcabc')
169
self.assertEquals(rev.parent_ids[1],
170
'wibble@fofof--20050401--1928390812')
171
# list should be cleared when we do a commit
172
self.assertEquals(wt.pending_merges(), [])
174
def test_sign_existing_revision(self):
175
branch = Branch.initialize(u'.')
176
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
177
from bzrlib.testament import Testament
178
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
179
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
180
branch.revision_store.get('A', 'sig').read())
182
def test_store_signature(self):
183
branch = Branch.initialize(u'.')
184
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
186
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
188
def test__relcontrolfilename(self):
189
branch = Branch.initialize(u'.')
190
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
192
def test__relcontrolfilename_empty(self):
193
branch = Branch.initialize(u'.')
194
self.assertEqual('.bzr', branch._rel_controlfilename(''))
196
def test_nicks(self):
197
"""Branch nicknames"""
199
branch = Branch.initialize('bzr.dev')
200
self.assertEqual(branch.nick, 'bzr.dev')
201
os.rename('bzr.dev', 'bzr.ab')
202
branch = Branch.open('bzr.ab')
203
self.assertEqual(branch.nick, 'bzr.ab')
204
branch.nick = "Aaron's branch"
205
branch.nick = "Aaron's branch"
206
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
207
self.assertEqual(branch.nick, "Aaron's branch")
208
os.rename('bzr.ab', 'integration')
209
branch = Branch.open('integration')
210
self.assertEqual(branch.nick, "Aaron's branch")
211
branch.nick = u"\u1234"
212
self.assertEqual(branch.nick, u"\u1234")
214
def test_commit_nicks(self):
215
"""Nicknames are committed to the revision"""
217
branch = Branch.initialize('bzr.dev')
218
branch.nick = "My happy branch"
219
branch.working_tree().commit('My commit respect da nick.')
220
committed = branch.get_revision(branch.last_revision())
221
self.assertEqual(committed.properties["branch-nick"],
224
def test_no_ancestry_weave(self):
225
# We no longer need to create the ancestry.weave file
226
# since it is *never* used.
227
branch = Branch.initialize(u'.')
228
self.failIfExists('.bzr/ancestry.weave')
231
class TestRemote(TestCaseWithWebserver):
233
def test_open_containing(self):
234
self.assertRaises(NotBranchError, Branch.open_containing,
235
self.get_remote_url(''))
236
self.assertRaises(NotBranchError, Branch.open_containing,
237
self.get_remote_url('g/p/q'))
238
b = Branch.initialize(u'.')
239
branch, relpath = Branch.open_containing(self.get_remote_url(''))
240
self.assertEqual('', relpath)
241
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
242
self.assertEqual('g/p/q', relpath)
244
# TODO: rewrite this as a regular unittest, without relying on the displayed output
245
# >>> from bzrlib.commit import commit
246
# >>> bzrlib.trace.silent = True
247
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
248
# >>> br1.working_tree().add('foo')
249
# >>> br1.working_tree().add('bar')
250
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
251
# >>> br2 = ScratchBranch()
252
# >>> br2.update_revisions(br1)
254
# Added 1 inventories.
256
# >>> br2.revision_history()
258
# >>> br2.update_revisions(br1)
260
# >>> br1.text_store.total_size() == br2.text_store.total_size()
263
class InstrumentedTransaction(object):
266
self.calls.append('finish')
272
class TestDecorator(object):
278
self._calls.append('lr')
280
def lock_write(self):
281
self._calls.append('lw')
284
self._calls.append('ul')
287
def do_with_read(self):
291
def except_with_read(self):
295
def do_with_write(self):
299
def except_with_write(self):
303
class TestDecorators(TestCase):
305
def test_needs_read_lock(self):
306
branch = TestDecorator()
307
self.assertEqual(1, branch.do_with_read())
308
self.assertEqual(['lr', 'ul'], branch._calls)
310
def test_excepts_in_read_lock(self):
311
branch = TestDecorator()
312
self.assertRaises(RuntimeError, branch.except_with_read)
313
self.assertEqual(['lr', 'ul'], branch._calls)
315
def test_needs_write_lock(self):
316
branch = TestDecorator()
317
self.assertEqual(2, branch.do_with_write())
318
self.assertEqual(['lw', 'ul'], branch._calls)
320
def test_excepts_in_write_lock(self):
321
branch = TestDecorator()
322
self.assertRaises(RuntimeError, branch.except_with_write)
323
self.assertEqual(['lw', 'ul'], branch._calls)
326
class TestBranchTransaction(TestCaseInTempDir):
329
super(TestBranchTransaction, self).setUp()
330
self.branch = Branch.initialize(u'.')
332
def test_default_get_transaction(self):
333
"""branch.get_transaction on a new branch should give a PassThrough."""
334
self.failUnless(isinstance(self.branch.get_transaction(),
335
transactions.PassThroughTransaction))
337
def test__set_new_transaction(self):
338
self.branch._set_transaction(transactions.ReadOnlyTransaction())
340
def test__set_over_existing_transaction_raises(self):
341
self.branch._set_transaction(transactions.ReadOnlyTransaction())
342
self.assertRaises(errors.LockError,
343
self.branch._set_transaction,
344
transactions.ReadOnlyTransaction())
346
def test_finish_no_transaction_raises(self):
347
self.assertRaises(errors.LockError, self.branch._finish_transaction)
349
def test_finish_readonly_transaction_works(self):
350
self.branch._set_transaction(transactions.ReadOnlyTransaction())
351
self.branch._finish_transaction()
352
self.assertEqual(None, self.branch._transaction)
354
def test_unlock_calls_finish(self):
355
self.branch.lock_read()
356
transaction = InstrumentedTransaction()
357
self.branch._transaction = transaction
359
self.assertEqual(['finish'], transaction.calls)
361
def test_lock_read_acquires_ro_transaction(self):
362
self.branch.lock_read()
363
self.failUnless(isinstance(self.branch.get_transaction(),
364
transactions.ReadOnlyTransaction))
367
def test_lock_write_acquires_passthrough_transaction(self):
368
self.branch.lock_write()
369
# cannot use get_transaction as its magic
370
self.failUnless(isinstance(self.branch._transaction,
371
transactions.PassThroughTransaction))
375
class TestBranchPushLocations(TestCaseInTempDir):
378
super(TestBranchPushLocations, self).setUp()
379
self.branch = Branch.initialize(u'.')
381
def test_get_push_location_unset(self):
382
self.assertEqual(None, self.branch.get_push_location())
384
def test_get_push_location_exact(self):
385
from bzrlib.config import (branches_config_filename,
386
ensure_config_dir_exists)
387
ensure_config_dir_exists()
388
fn = branches_config_filename()
389
print >> open(fn, 'wt'), ("[%s]\n"
390
"push_location=foo" %
392
self.assertEqual("foo", self.branch.get_push_location())
394
def test_set_push_location(self):
395
from bzrlib.config import (branches_config_filename,
396
ensure_config_dir_exists)
397
ensure_config_dir_exists()
398
fn = branches_config_filename()
399
self.branch.set_push_location('foo')
400
self.assertFileEqual("[%s]\n"
401
"push_location = foo" % getcwd(),
404
# TODO RBC 20051029 test getting a push location from a branch in a
405
# recursive section - that is, it appends the branch name.