14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from bzrlib.branch import Branch
19
from bzrlib.clone import copy_branch
20
from bzrlib.commit import commit
21
import bzrlib.errors as errors
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
17
23
from bzrlib.selftest import TestCaseInTempDir
20
class TestAppendRevisions(TestCaseInTempDir):
21
"""Test appending more than one revision"""
24
from bzrlib.trace import mutter
25
import bzrlib.transactions as transactions
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
28
class TestBranch(TestCaseInTempDir):
22
30
def test_append_revisions(self):
23
from bzrlib.branch import Branch
24
br = Branch(".", init=True)
31
"""Test appending more than one revision"""
32
br = Branch.initialize(".")
25
33
br.append_revision("rev1")
26
34
self.assertEquals(br.revision_history(), ["rev1",])
27
35
br.append_revision("rev2", "rev3")
28
36
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
38
def test_fetch_revisions(self):
39
"""Test fetch-revision operation."""
40
from bzrlib.fetch import Fetcher
43
b1 = Branch.initialize('b1')
44
b2 = Branch.initialize('b2')
45
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
46
b1.add(['foo'], ['foo-id'])
47
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
50
f = Fetcher(from_branch=b1, to_branch=b2)
51
eq = self.assertEquals
53
eq(f.last_revision, 'revision-1')
55
rev = b2.get_revision('revision-1')
56
tree = b2.revision_tree('revision-1')
57
eq(tree.get_file_text('foo-id'), 'hello')
59
def test_push_stores(self):
60
"""Copy the stores from one branch to another"""
62
br_a = Branch.initialize("a")
63
file('a/b', 'wb').write('b')
65
commit(br_a, "silly commit")
68
br_b = Branch.initialize("b")
69
self.assertRaises(NoSuchRevision, br_b.get_revision,
70
br_a.revision_history()[0])
71
br_a.push_stores(br_b)
72
rev = br_b.get_revision(br_a.revision_history()[0])
73
tree = br_b.revision_tree(br_a.revision_history()[0])
75
if tree.inventory[file_id].kind == "file":
76
tree.get_file(file_id).read()
79
def test_copy_branch(self):
80
"""Copy the stores from one branch to another"""
81
br_a, br_b = self.test_push_stores()
82
commit(br_b, "silly commit")
84
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
85
self.assertEqual(br_a.revision_history(), br_c.revision_history())
86
## # basis branches currently disabled for weave format
87
## self.assertFalse(br_b.last_revision() in br_c.revision_history())
88
## br_c.get_revision(br_b.last_revision())
90
def test_copy_partial(self):
91
"""Copy only part of the history of a branch."""
92
self.build_tree(['a/', 'a/one'])
93
br_a = Branch.initialize('a')
95
br_a.commit('commit one', rev_id='u@d-1')
96
self.build_tree(['a/two'])
98
br_a.commit('commit two', rev_id='u@d-2')
99
br_b = copy_branch(br_a, 'b', revision='u@d-1')
100
self.assertEqual(br_b.last_revision(), 'u@d-1')
101
self.assertTrue(os.path.exists('b/one'))
102
self.assertFalse(os.path.exists('b/two'))
105
def test_record_initial_ghost_merge(self):
106
"""A pending merge with no revision present is still a merge."""
107
branch = Branch.initialize('.')
108
branch.add_pending_merge('non:existent@rev--ision--0--2')
109
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
110
rev = branch.get_revision(branch.last_revision())
111
self.assertEqual(len(rev.parent_ids), 1)
112
# parent_sha1s is not populated now, WTF. rbc 20051003
113
self.assertEqual(len(rev.parent_sha1s), 0)
114
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
117
# compare the gpg-to-sign info for a commit with a ghost and
118
# an identical tree without a ghost
119
# fetch missing should rewrite the TOC of weaves to list newly available parents.
121
def test_pending_merges(self):
122
"""Tracking pending-merged revisions."""
123
b = Branch.initialize('.')
125
self.assertEquals(b.pending_merges(), [])
126
b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
b.add_pending_merge('foo@azkhazan-123123-abcabc')
129
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
130
b.add_pending_merge('wibble@fofof--20050401--1928390812')
131
self.assertEquals(b.pending_merges(),
132
['foo@azkhazan-123123-abcabc',
133
'wibble@fofof--20050401--1928390812'])
134
b.commit("commit from base with two merges")
135
rev = b.get_revision(b.revision_history()[0])
136
self.assertEquals(len(rev.parent_ids), 2)
137
self.assertEquals(rev.parent_ids[0],
138
'foo@azkhazan-123123-abcabc')
139
self.assertEquals(rev.parent_ids[1],
140
'wibble@fofof--20050401--1928390812')
141
# list should be cleared when we do a commit
142
self.assertEquals(b.pending_merges(), [])
145
class TestRemote(TestCaseWithWebserver):
147
def test_open_containing(self):
148
self.assertRaises(NotBranchError, Branch.open_containing,
149
self.get_remote_url(''))
150
self.assertRaises(NotBranchError, Branch.open_containing,
151
self.get_remote_url('g/p/q'))
152
b = Branch.initialize('.')
153
Branch.open_containing(self.get_remote_url(''))
154
Branch.open_containing(self.get_remote_url('g/p/q'))
31
156
# TODO: rewrite this as a regular unittest, without relying on the displayed output
32
157
# >>> from bzrlib.commit import commit
33
158
# >>> bzrlib.trace.silent = True
46
171
# Added 0 revisions.
47
172
# >>> br1.text_store.total_size() == br2.text_store.total_size()
175
class InstrumentedTransaction(object):
178
self.calls.append('finish')
184
class TestBranchTransaction(TestCaseInTempDir):
187
super(TestBranchTransaction, self).setUp()
188
self.branch = Branch.initialize('.')
190
def test_default_get_transaction(self):
191
"""branch.get_transaction on a new branch should give a PassThrough."""
192
self.failUnless(isinstance(self.branch.get_transaction(),
193
transactions.PassThroughTransaction))
195
def test__set_new_transaction(self):
196
self.branch._set_transaction(transactions.ReadOnlyTransaction())
198
def test__set_over_existing_transaction_raises(self):
199
self.branch._set_transaction(transactions.ReadOnlyTransaction())
200
self.assertRaises(errors.LockError,
201
self.branch._set_transaction,
202
transactions.ReadOnlyTransaction())
204
def test_finish_no_transaction_raises(self):
205
self.assertRaises(errors.LockError, self.branch._finish_transaction)
207
def test_finish_readonly_transaction_works(self):
208
self.branch._set_transaction(transactions.ReadOnlyTransaction())
209
self.branch._finish_transaction()
210
self.assertEqual(None, self.branch._transaction)
212
def test_unlock_calls_finish(self):
213
self.branch.lock_read()
214
transaction = InstrumentedTransaction()
215
self.branch._transaction = transaction
217
self.assertEqual(['finish'], transaction.calls)
219
def test_lock_read_acquires_ro_transaction(self):
220
self.branch.lock_read()
221
self.failUnless(isinstance(self.branch.get_transaction(),
222
transactions.ReadOnlyTransaction))
225
def test_lock_write_acquires_passthrough_transaction(self):
226
self.branch.lock_write()
227
# cannot use get_transaction as its magic
228
self.failUnless(isinstance(self.branch._transaction,
229
transactions.PassThroughTransaction))