14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.osutils import getcwd
26
from bzrlib.tests import TestCase, TestCaseInTempDir
27
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
28
from bzrlib.trace import mutter
29
import bzrlib.transactions as transactions
30
from bzrlib.revision import NULL_REVISION
32
# TODO: Make a branch using basis branch, and check that it
33
# doesn't request any files that could have been avoided, by
34
# hooking into the Transport.
36
class TestBranch(TestCaseInTempDir):
17
from bzrlib.selftest import TestCaseInTempDir
20
class TestAppendRevisions(TestCaseInTempDir):
21
"""Test appending more than one revision"""
38
22
def test_append_revisions(self):
39
"""Test appending more than one revision"""
40
br = Branch.initialize(u".")
23
from bzrlib.branch import Branch
24
br = Branch(".", init=True)
41
25
br.append_revision("rev1")
42
26
self.assertEquals(br.revision_history(), ["rev1",])
43
27
br.append_revision("rev2", "rev3")
44
28
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
46
def test_fetch_revisions(self):
47
"""Test fetch-revision operation."""
48
from bzrlib.fetch import Fetcher
51
b1 = Branch.initialize('b1')
52
b2 = Branch.initialize('b2')
53
file('b1/foo', 'w').write('hello')
54
b1.working_tree().add(['foo'], ['foo-id'])
55
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
58
f = Fetcher(from_branch=b1, to_branch=b2)
59
eq = self.assertEquals
61
eq(f.last_revision, 'revision-1')
63
rev = b2.repository.get_revision('revision-1')
64
tree = b2.repository.revision_tree('revision-1')
65
eq(tree.get_file_text('foo-id'), 'hello')
67
def test_revision_tree(self):
68
b1 = Branch.initialize(u'.')
69
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
70
tree = b1.repository.revision_tree('revision-1')
71
tree = b1.repository.revision_tree(None)
72
self.assertEqual(len(tree.list_files()), 0)
73
tree = b1.repository.revision_tree(NULL_REVISION)
74
self.assertEqual(len(tree.list_files()), 0)
76
def get_unbalanced_branch_pair(self):
77
"""Return two branches, a and b, with one file in a."""
79
br_a = Branch.initialize("a")
80
file('a/b', 'wb').write('b')
81
br_a.working_tree().add('b')
82
commit(br_a, "silly commit", rev_id='A')
84
br_b = Branch.initialize("b")
87
def get_balanced_branch_pair(self):
88
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
89
br_a, br_b = self.get_unbalanced_branch_pair()
90
br_a.push_stores(br_b)
93
def test_push_stores(self):
94
"""Copy the stores from one branch to another"""
95
br_a, br_b = self.get_unbalanced_branch_pair()
96
# ensure the revision is missing.
97
self.assertRaises(NoSuchRevision, br_b.repository.get_revision,
98
br_a.revision_history()[0])
99
br_a.push_stores(br_b)
100
# check that b now has all the data from a's first commit.
101
rev = br_b.repository.get_revision(br_a.revision_history()[0])
102
tree = br_b.repository.revision_tree(br_a.revision_history()[0])
104
if tree.inventory[file_id].kind == "file":
105
tree.get_file(file_id).read()
108
def test_clone_branch(self):
109
"""Copy the stores from one branch to another"""
110
br_a, br_b = self.get_balanced_branch_pair()
111
commit(br_b, "silly commit")
113
br_c = br_a.clone('c', basis_branch=br_b)
114
self.assertEqual(br_a.revision_history(), br_c.revision_history())
116
def test_clone_partial(self):
117
"""Copy only part of the history of a branch."""
118
self.build_tree(['a/', 'a/one'])
119
br_a = Branch.initialize('a')
120
br_a.working_tree().add(['one'])
121
br_a.working_tree().commit('commit one', rev_id='u@d-1')
122
self.build_tree(['a/two'])
123
br_a.working_tree().add(['two'])
124
br_a.working_tree().commit('commit two', rev_id='u@d-2')
125
br_b = br_a.clone('b', revision='u@d-1')
126
self.assertEqual(br_b.last_revision(), 'u@d-1')
127
self.assertTrue(os.path.exists('b/one'))
128
self.assertFalse(os.path.exists('b/two'))
130
def test_record_initial_ghost_merge(self):
131
"""A pending merge with no revision present is still a merge."""
132
branch = Branch.initialize(u'.')
133
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
134
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
135
rev = branch.repository.get_revision(branch.last_revision())
136
self.assertEqual(len(rev.parent_ids), 1)
137
# parent_sha1s is not populated now, WTF. rbc 20051003
138
self.assertEqual(len(rev.parent_sha1s), 0)
139
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
141
def test_bad_revision(self):
142
branch = Branch.initialize('.')
143
self.assertRaises(errors.InvalidRevisionId,
144
branch.repository.get_revision, None)
147
# compare the gpg-to-sign info for a commit with a ghost and
148
# an identical tree without a ghost
149
# fetch missing should rewrite the TOC of weaves to list newly available parents.
151
def test_pending_merges(self):
152
"""Tracking pending-merged revisions."""
153
b = Branch.initialize(u'.')
154
wt = b.working_tree()
155
self.assertEquals(wt.pending_merges(), [])
156
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
159
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
160
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
161
self.assertEquals(wt.pending_merges(),
162
['foo@azkhazan-123123-abcabc',
163
'wibble@fofof--20050401--1928390812'])
164
b.working_tree().commit("commit from base with two merges")
165
rev = b.repository.get_revision(b.revision_history()[0])
166
self.assertEquals(len(rev.parent_ids), 2)
167
self.assertEquals(rev.parent_ids[0],
168
'foo@azkhazan-123123-abcabc')
169
self.assertEquals(rev.parent_ids[1],
170
'wibble@fofof--20050401--1928390812')
171
# list should be cleared when we do a commit
172
self.assertEquals(wt.pending_merges(), [])
174
def test_sign_existing_revision(self):
175
branch = Branch.initialize(u'.')
176
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
177
from bzrlib.testament import Testament
178
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
179
branch.repository.sign_revision('A', strategy)
180
self.assertEqual(Testament.from_revision(branch.repository,
181
'A').as_short_text(),
182
branch.repository.revision_store.get('A',
185
def test_store_signature(self):
186
branch = Branch.initialize('.')
187
branch.repository.store_revision_signature(
188
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
189
self.assertEqual('FOO',
190
branch.repository.revision_store.get('A',
193
def test__escape(self):
194
branch = Branch.initialize('.')
195
self.assertEqual('.bzr/%25', branch.control_files._escape('%'))
197
def test__escape_empty(self):
198
branch = Branch.initialize('.')
199
self.assertEqual('', branch.control_files._escape(''))
201
def test_nicks(self):
202
"""Branch nicknames"""
204
branch = Branch.initialize('bzr.dev')
205
self.assertEqual(branch.nick, 'bzr.dev')
206
os.rename('bzr.dev', 'bzr.ab')
207
branch = Branch.open('bzr.ab')
208
self.assertEqual(branch.nick, 'bzr.ab')
209
branch.nick = "Aaron's branch"
210
branch.nick = "Aaron's branch"
211
self.failUnlessExists(branch.control_files.controlfilename("branch.conf"))
212
self.assertEqual(branch.nick, "Aaron's branch")
213
os.rename('bzr.ab', 'integration')
214
branch = Branch.open('integration')
215
self.assertEqual(branch.nick, "Aaron's branch")
216
branch.nick = u"\u1234"
217
self.assertEqual(branch.nick, u"\u1234")
219
def test_commit_nicks(self):
220
"""Nicknames are committed to the revision"""
222
branch = Branch.initialize('bzr.dev')
223
branch.nick = "My happy branch"
224
branch.working_tree().commit('My commit respect da nick.')
225
committed = branch.repository.get_revision(branch.last_revision())
226
self.assertEqual(committed.properties["branch-nick"],
230
class TestRemote(TestCaseWithWebserver):
232
def test_open_containing(self):
233
self.assertRaises(NotBranchError, Branch.open_containing,
234
self.get_remote_url(''))
235
self.assertRaises(NotBranchError, Branch.open_containing,
236
self.get_remote_url('g/p/q'))
237
b = Branch.initialize(u'.')
238
branch, relpath = Branch.open_containing(self.get_remote_url(''))
239
self.assertEqual('', relpath)
240
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
241
self.assertEqual('g/p/q', relpath)
243
31
# TODO: rewrite this as a regular unittest, without relying on the displayed output
244
32
# >>> from bzrlib.commit import commit
245
33
# >>> bzrlib.trace.silent = True
246
34
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
247
# >>> br1.working_tree().add('foo')
248
# >>> br1.working_tree().add('bar')
249
37
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
250
38
# >>> br2 = ScratchBranch()
251
39
# >>> br2.update_revisions(br1)
258
46
# Added 0 revisions.
259
47
# >>> br1.text_store.total_size() == br2.text_store.total_size()
262
class InstrumentedTransaction(object):
265
self.calls.append('finish')
271
class TestDecorator(object):
277
self._calls.append('lr')
279
def lock_write(self):
280
self._calls.append('lw')
283
self._calls.append('ul')
286
def do_with_read(self):
290
def except_with_read(self):
294
def do_with_write(self):
298
def except_with_write(self):
302
class TestDecorators(TestCase):
304
def test_needs_read_lock(self):
305
branch = TestDecorator()
306
self.assertEqual(1, branch.do_with_read())
307
self.assertEqual(['lr', 'ul'], branch._calls)
309
def test_excepts_in_read_lock(self):
310
branch = TestDecorator()
311
self.assertRaises(RuntimeError, branch.except_with_read)
312
self.assertEqual(['lr', 'ul'], branch._calls)
314
def test_needs_write_lock(self):
315
branch = TestDecorator()
316
self.assertEqual(2, branch.do_with_write())
317
self.assertEqual(['lw', 'ul'], branch._calls)
319
def test_excepts_in_write_lock(self):
320
branch = TestDecorator()
321
self.assertRaises(RuntimeError, branch.except_with_write)
322
self.assertEqual(['lw', 'ul'], branch._calls)
325
class TestBranchTransaction(TestCaseInTempDir):
328
super(TestBranchTransaction, self).setUp()
329
self.branch = Branch.initialize(u'.')
331
def test_default_get_transaction(self):
332
"""branch.get_transaction on a new branch should give a PassThrough."""
333
self.failUnless(isinstance(self.branch.get_transaction(),
334
transactions.PassThroughTransaction))
336
def test__set_new_transaction(self):
337
self.branch._set_transaction(transactions.ReadOnlyTransaction())
339
def test__set_over_existing_transaction_raises(self):
340
self.branch._set_transaction(transactions.ReadOnlyTransaction())
341
self.assertRaises(errors.LockError,
342
self.branch._set_transaction,
343
transactions.ReadOnlyTransaction())
345
def test_finish_no_transaction_raises(self):
346
self.assertRaises(errors.LockError, self.branch._finish_transaction)
348
def test_finish_readonly_transaction_works(self):
349
self.branch._set_transaction(transactions.ReadOnlyTransaction())
350
self.branch._finish_transaction()
351
self.assertEqual(None, self.branch.control_files._transaction)
353
def test_unlock_calls_finish(self):
354
self.branch.lock_read()
355
transaction = InstrumentedTransaction()
356
self.branch.control_files._transaction = transaction
358
self.assertEqual(['finish'], transaction.calls)
360
def test_lock_read_acquires_ro_transaction(self):
361
self.branch.lock_read()
362
self.failUnless(isinstance(self.branch.get_transaction(),
363
transactions.ReadOnlyTransaction))
366
def test_lock_write_acquires_passthrough_transaction(self):
367
self.branch.lock_write()
368
# cannot use get_transaction as its magic
369
self.failUnless(isinstance(self.branch.control_files._transaction,
370
transactions.PassThroughTransaction))
374
class TestBranchPushLocations(TestCaseInTempDir):
377
super(TestBranchPushLocations, self).setUp()
378
self.branch = Branch.initialize(u'.')
380
def test_get_push_location_unset(self):
381
self.assertEqual(None, self.branch.get_push_location())
383
def test_get_push_location_exact(self):
384
from bzrlib.config import (branches_config_filename,
385
ensure_config_dir_exists)
386
ensure_config_dir_exists()
387
fn = branches_config_filename()
388
print >> open(fn, 'wt'), ("[%s]\n"
389
"push_location=foo" %
391
self.assertEqual("foo", self.branch.get_push_location())
393
def test_set_push_location(self):
394
from bzrlib.config import (branches_config_filename,
395
ensure_config_dir_exists)
396
ensure_config_dir_exists()
397
fn = branches_config_filename()
398
self.branch.set_push_location('foo')
399
self.assertFileEqual("[%s]\n"
400
"push_location = foo" % getcwd(),
403
# TODO RBC 20051029 test getting a push location from a branch in a
404
# recursive section - that is, it appends the branch name.