1
# Copyright (C) 2005, 2006, 2007, 2009, 2010 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for branches bound to an sftp branch."""
28
from bzrlib.tests import test_server
29
from bzrlib.transport import memory
32
class BoundSFTPBranch(tests.TestCaseWithTransport):
35
tests.TestCaseWithTransport.setUp(self)
36
self.vfs_transport_factory = memory.MemoryServer
37
if self.transport_server is test_server.LocalURLServer:
38
self.transport_server = None
40
def create_branches(self):
41
self.build_tree(['base/', 'base/a', 'base/b'])
42
format = controldir.format_registry.make_bzrdir('knit')
44
wt_base = controldir.ControlDir.create_standalone_workingtree(
45
self.get_url('base'), format=format)
46
except errors.NotLocalUrl:
47
raise tests.TestSkipped('Not a local URL')
49
b_base = wt_base.branch
53
wt_base.commit('first', rev_id='r@b-1')
55
wt_child = b_base.bzrdir.sprout('child').open_workingtree()
56
self.sftp_base = branch.Branch.open(self.get_url('base'))
57
wt_child.branch.bind(self.sftp_base)
58
# check the branch histories are ready for using in tests.
59
self.assertEqual(['r@b-1'], b_base.revision_history())
60
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
61
return b_base, wt_child
63
def test_simple_binding(self):
64
self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
66
wt_base = controldir.ControlDir.create_standalone_workingtree(
68
except errors.NotLocalUrl:
69
raise tests.TestSkipped('Not a local URL')
73
wt_base.commit('first', rev_id='r@b-1')
75
b_base = wt_base.branch
76
# manually make a branch we can bind, because the default format
77
# may not be bindable-from, and we want to test the side effects etc
79
format = controldir.format_registry.make_bzrdir('knit')
80
b_child = controldir.ControlDir.create_branch_convenience(
81
'child', format=format)
82
self.assertEqual(None, b_child.get_bound_location())
83
self.assertEqual(None, b_child.get_master_branch())
85
sftp_b_base = branch.Branch.open(self.get_url('base'))
86
b_child.bind(sftp_b_base)
87
self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
88
# the bind must not have given b_child history:
89
self.assertEqual([], b_child.revision_history())
90
# we should be able to update the branch at this point:
91
self.assertEqual(None, b_child.update())
92
# and now there must be history.
93
self.assertEqual(['r@b-1'], b_child.revision_history())
94
# this line is more of a working tree test line, but - what the hey,
96
b_child.bzrdir.open_workingtree().update()
97
self.assertPathExists('child/a')
98
self.assertPathExists('child/b')
101
self.assertEqual(None, b_child.get_bound_location())
103
def test_bound_commit(self):
104
b_base, wt_child = self.create_branches()
106
with open('child/a', 'wb') as f: f.write('new contents\n')
107
wt_child.commit('modified a', rev_id='r@c-2')
109
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
110
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
112
def test_bound_commit_fails_when_out_of_date(self):
113
# Make sure commit fails if out of date.
114
b_base, wt_child = self.create_branches()
116
with open('base/a', 'wb') as f: f.write('new base contents\n')
117
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
119
with open('child/b', 'wb') as f: f.write('new b child contents\n')
120
self.assertRaises(errors.BoundBranchOutOfDate,
121
wt_child.commit, 'child', rev_id='r@c-2')
123
sftp_b_base = branch.Branch.open(self.get_url('base'))
125
# This is all that cmd_update does
126
wt_child.pull(sftp_b_base, overwrite=False)
128
wt_child.commit('child', rev_id='r@c-3')
130
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
131
wt_child.branch.revision_history())
132
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
133
b_base.revision_history())
134
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
135
sftp_b_base.revision_history())
137
def test_double_binding(self):
138
b_base, wt_child = self.create_branches()
140
wt_child2 = wt_child.branch.create_checkout('child2')
142
with open('child2/a', 'wb') as f: f.write('new contents\n')
143
self.assertRaises(errors.CommitToDoubleBoundBranch,
144
wt_child2.commit, 'child2', rev_id='r@d-2')
146
def test_unbinding(self):
147
b_base, wt_child = self.create_branches()
149
# TestCaseWithSFTPServer only allows you to connect one time
150
# to the SFTP server. So we have to create a connection and
151
# keep it around, so that it can be reused
152
__unused_t = self.get_transport()
154
wt_base = b_base.bzrdir.open_workingtree()
155
with open('base/a', 'wb') as f: f.write('new base contents\n')
156
wt_base.commit('base', rev_id='r@b-2')
158
with open('child/b', 'wb') as f: f.write('new b child contents\n')
159
self.assertRaises(errors.BoundBranchOutOfDate,
160
wt_child.commit, 'child', rev_id='r@c-2')
161
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
162
wt_child.branch.unbind()
163
wt_child.commit('child', rev_id='r@c-2')
164
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
165
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
167
sftp_b_base = branch.Branch.open(self.get_url('base'))
168
self.assertRaises(errors.DivergedBranches,
169
wt_child.branch.bind, sftp_b_base)
171
def test_commit_remote_bound(self):
172
# Make sure it is detected if the current base is bound during the
173
# objects lifetime, when the child goes to commit.
174
b_base, wt_child = self.create_branches()
176
b_base.bzrdir.sprout('newbase')
178
sftp_b_base = branch.Branch.open(self.get_url('base'))
179
sftp_b_newbase = branch.Branch.open(self.get_url('newbase'))
181
sftp_b_base.bind(sftp_b_newbase)
183
with open('child/a', 'wb') as f: f.write('new contents\n')
184
self.assertRaises(errors.CommitToDoubleBoundBranch,
185
wt_child.commit, 'failure', rev_id='r@c-2')
187
self.assertEqual(['r@b-1'], b_base.revision_history())
188
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
189
self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
191
def test_bind_diverged(self):
192
b_base, wt_child = self.create_branches()
194
wt_child.branch.unbind()
195
with open('child/a', 'ab') as f: f.write('child contents\n')
196
wt_child_rev = wt_child.commit('child', rev_id='r@c-2')
198
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
199
self.assertEqual(['r@b-1'], b_base.revision_history())
201
with open('base/b', 'ab') as f: f.write('base contents\n')
202
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
203
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
205
sftp_b_base = branch.Branch.open(self.get_url('base'))
207
self.assertRaises(errors.DivergedBranches,
208
wt_child.branch.bind, sftp_b_base)
210
wt_child.merge_from_branch(sftp_b_base)
211
self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
212
wt_child.commit('merged', rev_id='r@c-3')
214
# After a merge, trying to bind again should succeed but not push the
216
wt_child.branch.bind(sftp_b_base)
218
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
219
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
220
wt_child.branch.revision_history())
222
def test_bind_parent_ahead_preserves_parent(self):
223
b_base, wt_child = self.create_branches()
225
wt_child.branch.unbind()
227
with open('a', 'ab') as f: f.write('base changes\n')
228
wt_base = b_base.bzrdir.open_workingtree()
229
wt_base.commit('base', rev_id='r@b-2')
230
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
231
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
233
sftp_b_base = branch.Branch.open(self.get_url('base'))
234
wt_child.branch.bind(sftp_b_base)
236
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
238
wt_child.branch.unbind()
240
# Check and make sure it also works if parent is ahead multiple
241
wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
242
wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
243
wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
245
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
246
b_base.revision_history())
248
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
250
wt_child.branch.bind(sftp_b_base)
251
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
253
def test_bind_child_ahead_preserves_child(self):
254
b_base, wt_child = self.create_branches()
256
wt_child.branch.unbind()
258
wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
259
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
260
self.assertEqual(['r@b-1'], b_base.revision_history())
262
sftp_b_base = branch.Branch.open(self.get_url('base'))
263
wt_child.branch.bind(sftp_b_base)
265
self.assertEqual(['r@b-1'], b_base.revision_history())
267
# Check and make sure it also works if child is ahead multiple
268
wt_child.branch.unbind()
269
wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
270
wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
271
wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
273
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
274
wt_child.branch.revision_history())
275
self.assertEqual(['r@b-1'], b_base.revision_history())
277
wt_child.branch.bind(sftp_b_base)
278
self.assertEqual(['r@b-1'], b_base.revision_history())
280
def test_commit_after_merge(self):
281
b_base, wt_child = self.create_branches()
283
# We want merge to be able to be a local only
284
# operation, because it does not alter the branch data.
286
# But we can't fail afterwards
288
wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
290
with open('other/c', 'wb') as f: f.write('file c\n')
292
wt_other.commit('adding c', rev_id='r@d-2')
294
self.assertFalse(wt_child.branch.repository.has_revision('r@d-2'))
295
self.assertFalse(b_base.repository.has_revision('r@d-2'))
297
wt_child.merge_from_branch(wt_other.branch)
299
self.assertPathExists('child/c')
300
self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
301
self.assertTrue(wt_child.branch.repository.has_revision('r@d-2'))
302
self.assertFalse(b_base.repository.has_revision('r@d-2'))
304
# Commit should succeed, and cause merged revisions to
305
# be pushed into base
306
wt_child.commit('merge other', rev_id='r@c-2')
307
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
308
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
309
self.assertTrue(b_base.repository.has_revision('r@d-2'))
311
def test_commit_fails(self):
312
b_base, wt_child = self.create_branches()
314
with open('a', 'ab') as f: f.write('child adds some text\n')
316
# this deletes the branch from memory
318
# and this moves it out of the way on disk
319
os.rename('base', 'hidden_base')
321
self.assertRaises(errors.BoundBranchConnectionFailure,
322
wt_child.commit, 'added text', rev_id='r@c-2')
324
# TODO: jam 20051231 We need invasive failure tests, so that we can show
325
# performance even when something fails.