119
115
def test_fetch_revisions(self):
120
116
"""Test fetch-revision operation."""
121
get_transport(self.get_url()).mkdir('b1')
122
get_transport(self.get_url()).mkdir('b2')
123
117
wt = self.make_branch_and_tree('b1')
125
b2 = self.make_branch('b2')
126
file('b1/foo', 'w').write('hello')
119
self.build_tree_contents([('b1/foo', 'hello')])
127
120
wt.add(['foo'], ['foo-id'])
128
121
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
130
mutter('start fetch')
123
b2 = self.make_branch('b2')
131
124
self.assertEqual((1, []), b2.fetch(b1))
133
126
rev = b2.repository.get_revision('revision-1')
153
146
def get_unbalanced_tree_pair(self):
154
147
"""Return two branches, a and b, with one file in a."""
155
get_transport(self.get_url()).mkdir('a')
156
148
tree_a = self.make_branch_and_tree('a')
157
file('a/b', 'wb').write('b')
149
self.build_tree_contents([('a/b', 'b')])
159
151
tree_a.commit("silly commit", rev_id='A')
161
get_transport(self.get_url()).mkdir('b')
162
153
tree_b = self.make_branch_and_tree('b')
163
154
return tree_a, tree_b
168
159
tree_b.branch.repository.fetch(tree_a.branch.repository)
169
160
return tree_a, tree_b
171
def test_clone_branch(self):
172
"""Copy the stores from one branch to another"""
173
tree_a, tree_b = self.get_balanced_branch_pair()
174
tree_b.commit("silly commit")
176
# this fails to test that the history from a was not used.
177
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
178
self.assertEqual(tree_a.branch.revision_history(),
179
dir_c.open_branch().revision_history())
181
162
def test_clone_partial(self):
182
163
"""Copy only part of the history of a branch."""
183
164
# TODO: RBC 20060208 test with a revision not on revision-history.
184
165
# what should that behaviour be ? Emailed the list.
166
# First, make a branch with two commits.
185
167
wt_a = self.make_branch_and_tree('a')
186
168
self.build_tree(['a/one'])
187
169
wt_a.add(['one'])
189
171
self.build_tree(['a/two'])
190
172
wt_a.add(['two'])
191
173
wt_a.commit('commit two', rev_id='2')
174
# Now make a copy of the repository.
192
175
repo_b = self.make_repository('b')
193
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
194
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
176
wt_a.branch.repository.copy_content_into(repo_b)
177
# wt_a might be a lightweight checkout, so get a hold of the actual
178
# branch (because you can't do a partial clone of a lightweight
180
branch = wt_a.branch.bzrdir.open_branch()
181
# Then make a branch where the new repository is, but specify a revision
182
# ID. The new branch's history will stop at the specified revision.
183
br_b = branch.clone(repo_b.bzrdir, revision_id='1')
195
184
self.assertEqual('1', br_b.last_revision())
197
186
def test_sprout_partial(self):
205
194
wt_a.add(['two'])
206
195
wt_a.commit('commit two', rev_id='2')
207
196
repo_b = self.make_repository('b')
208
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
209
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
197
repo_a = wt_a.branch.repository
198
repo_a.copy_content_into(repo_b)
199
br_b = wt_a.branch.sprout(repo_b.bzrdir, revision_id='1')
210
200
self.assertEqual('1', br_b.last_revision())
212
202
def get_parented_branch(self):
240
230
branch_d = branch_b.clone(repo_d.bzrdir)
241
231
self.assertEqual(random_parent, branch_d.get_parent())
243
def test_copy_content_incomplete(self):
244
tree = self.make_branch_and_tree('commit_tree')
245
self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
247
tree.commit('revision 1', rev_id='1')
248
source = self.make_branch_and_tree('source')
249
# this gives us an incomplete repository
250
tree.bzrdir.open_repository().copy_content_into(
251
source.branch.repository)
252
tree.commit('revision 2', rev_id='2', allow_pointless=True)
253
tree.bzrdir.open_branch().copy_content_into(source.branch)
256
233
def test_sprout_branch_nickname(self):
257
234
# test the nick name is reset always
258
235
raise TestSkipped('XXX branch sprouting is not yet tested..')
271
248
branch.set_submit_branch('sftp://example.net')
272
249
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
251
def test_public_branch(self):
252
"""public location can be queried and set"""
253
branch = self.make_branch('branch')
254
self.assertEqual(branch.get_public_branch(), None)
255
branch.set_public_branch('sftp://example.com')
256
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
257
branch.set_public_branch('sftp://example.net')
258
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
259
branch.set_public_branch(None)
260
self.assertEqual(branch.get_public_branch(), None)
274
262
def test_record_initial_ghost(self):
275
263
"""Branches should support having ghosts."""
276
264
wt = self.make_branch_and_tree('.')
314
302
from bzrlib.testament import Testament
315
303
strategy = gpg.LoopbackGPGStrategy(None)
316
304
branch.repository.sign_revision('A', strategy)
317
self.assertEqual(Testament.from_revision(branch.repository,
318
'A').as_short_text(),
305
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
306
Testament.from_revision(branch.repository,
307
'A').as_short_text() +
308
'-----END PSEUDO-SIGNED CONTENT-----\n',
319
309
branch.repository.get_signature_text('A'))
321
311
def test_store_signature(self):
327
317
branch.repository.has_signature_for_revision_id,
329
319
wt.commit("base", allow_pointless=True, rev_id='A')
330
self.assertEqual('FOO',
320
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
321
'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
331
322
branch.repository.get_signature_text('A'))
333
324
def test_branch_keeps_signatures(self):
334
325
wt = self.make_branch_and_tree('source')
335
326
wt.commit('A', allow_pointless=True, rev_id='A')
336
wt.branch.repository.sign_revision('A',
337
gpg.LoopbackGPGStrategy(None))
327
repo = wt.branch.repository
328
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
338
329
#FIXME: clone should work to urls,
339
330
# wt.clone should work to disks.
340
331
self.build_tree(['target/'])
341
d2 = wt.bzrdir.clone('target')
342
self.assertEqual(wt.branch.repository.get_signature_text('A'),
332
d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
333
self.assertEqual(repo.get_signature_text('A'),
343
334
d2.open_repository().get_signature_text('A'))
345
336
def test_nicks(self):
346
"""Branch nicknames"""
337
"""Test explicit and implicit branch nicknames.
339
Nicknames are implicitly the name of the branch's directory, unless an
340
explicit nickname is set. That is, an explicit nickname always
341
overrides the implicit one.
347
343
t = get_transport(self.get_url())
349
344
branch = self.make_branch('bzr.dev')
345
# The nick will be 'bzr.dev', because there is no explicit nick set.
350
346
self.assertEqual(branch.nick, 'bzr.dev')
347
# Move the branch to a different directory, 'bzr.ab'. Now that branch
348
# will report its nick as 'bzr.ab'.
351
349
t.move('bzr.dev', 'bzr.ab')
352
350
branch = Branch.open(self.get_url('bzr.ab'))
353
351
self.assertEqual(branch.nick, 'bzr.ab')
354
branch.nick = "Aaron's branch"
355
branch.nick = "Aaron's branch"
359
branch.control_files.controlfilename("branch.conf")
352
# Set the branch nick explicitly. This will ensure there's a branch
353
# config file in the branch.
354
branch.nick = "Aaron's branch"
355
branch.nick = "Aaron's branch"
356
if not isinstance(branch, remote.RemoteBranch):
357
controlfilename = branch.control_files.controlfilename
358
self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
359
# Because the nick has been set explicitly, the nick is now always
360
# "Aaron's branch", regardless of directory name.
363
361
self.assertEqual(branch.nick, "Aaron's branch")
364
362
t.move('bzr.ab', 'integration')
365
363
branch = Branch.open(self.get_url('integration'))
367
365
branch.nick = u"\u1234"
368
366
self.assertEqual(branch.nick, u"\u1234")
368
def test_commit_nicks(self):
369
"""Nicknames are committed to the revision"""
370
wt = self.make_branch_and_tree('bzr.dev')
372
branch.nick = "My happy branch"
373
wt.commit('My commit respect da nick.')
374
committed = branch.repository.get_revision(branch.last_revision())
375
self.assertEqual(committed.properties["branch-nick"],
370
378
def test_create_open_branch_uses_repository(self):
372
380
repo = self.make_repository('.', shared=True)
373
381
except errors.IncompatibleFormat:
375
repo.bzrdir.root_transport.mkdir('child')
376
child_dir = self.bzrdir_format.initialize('child')
383
child_transport = repo.bzrdir.root_transport.clone('child')
384
child_transport.mkdir('.')
385
child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
378
387
child_branch = self.branch_format.initialize(child_dir)
379
388
except errors.UninitializableFormat:
443
452
"""A lightweight checkout from a readonly branch should succeed."""
444
453
tree_a = self.make_branch_and_tree('a')
445
454
rev_id = tree_a.commit('put some content in the branch')
446
source_branch = bzrlib.branch.Branch.open(
447
'readonly+' + tree_a.bzrdir.root_transport.base)
455
# open the branch via a readonly transport
456
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
448
457
# sanity check that the test will be valid
449
458
self.assertRaises((errors.LockError, errors.TransportNotPossible),
450
459
source_branch.lock_write)
455
464
"""A regular checkout from a readonly branch should succeed."""
456
465
tree_a = self.make_branch_and_tree('a')
457
466
rev_id = tree_a.commit('put some content in the branch')
458
source_branch = bzrlib.branch.Branch.open(
459
'readonly+' + tree_a.bzrdir.root_transport.base)
467
# open the branch via a readonly transport
468
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
460
469
# sanity check that the test will be valid
461
470
self.assertRaises((errors.LockError, errors.TransportNotPossible),
462
471
source_branch.lock_write)
463
472
checkout = source_branch.create_checkout('c')
464
473
self.assertEqual(rev_id, checkout.last_revision())
475
def test_set_revision_history(self):
476
tree = self.make_branch_and_tree('a')
477
tree.commit('a commit', rev_id='rev1')
479
br.set_revision_history(["rev1"])
480
self.assertEquals(br.revision_history(), ["rev1"])
481
br.set_revision_history([])
482
self.assertEquals(br.revision_history(), [])
467
485
class ChrootedTests(TestCaseWithBranch):
468
486
"""A support class that provides readonly urls outside the local namespace.
552
570
self.assertEqual(['lw', 'ul'], branch._calls)
555
class TestBranchTransaction(TestCaseWithBranch):
558
super(TestBranchTransaction, self).setUp()
561
def test_default_get_transaction(self):
562
"""branch.get_transaction on a new branch should give a PassThrough."""
563
self.failUnless(isinstance(self.get_branch().get_transaction(),
564
transactions.PassThroughTransaction))
566
def test__set_new_transaction(self):
567
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
569
def test__set_over_existing_transaction_raises(self):
570
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
571
self.assertRaises(errors.LockError,
572
self.get_branch()._set_transaction,
573
transactions.ReadOnlyTransaction())
575
def test_finish_no_transaction_raises(self):
576
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
578
def test_finish_readonly_transaction_works(self):
579
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
580
self.get_branch()._finish_transaction()
581
self.assertEqual(None, self.get_branch().control_files._transaction)
583
def test_unlock_calls_finish(self):
584
self.get_branch().lock_read()
585
transaction = InstrumentedTransaction()
586
self.get_branch().control_files._transaction = transaction
587
self.get_branch().unlock()
588
self.assertEqual(['finish'], transaction.calls)
590
def test_lock_read_acquires_ro_transaction(self):
591
self.get_branch().lock_read()
592
self.failUnless(isinstance(self.get_branch().get_transaction(),
593
transactions.ReadOnlyTransaction))
594
self.get_branch().unlock()
596
def test_lock_write_acquires_write_transaction(self):
597
self.get_branch().lock_write()
598
# cannot use get_transaction as its magic
599
self.failUnless(isinstance(self.get_branch().control_files._transaction,
600
transactions.WriteTransaction))
601
self.get_branch().unlock()
604
573
class TestBranchPushLocations(TestCaseWithBranch):
606
575
def test_get_push_location_unset(self):
625
594
class TestFormat(TestCaseWithBranch):
626
595
"""Tests for the format itself."""
597
def test_get_reference(self):
598
"""get_reference on all regular branches should return None."""
599
if not self.branch_format.is_supported():
600
# unsupported formats are not loopback testable
601
# because the default open will not open them and
602
# they may not be initializable.
604
made_branch = self.make_branch('.')
605
self.assertEqual(None,
606
made_branch._format.get_reference(made_branch.bzrdir))
628
608
def test_format_initialize_find_open(self):
629
609
# loopback test to check the current format initializes to itself.
630
610
if not self.branch_format.is_supported():