1
# Copyright (C) 2005, 2006, 2007, 2009, 2010 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for branches bound to an sftp branch."""
28
from bzrlib.tests import test_server
29
from bzrlib.tests.matchers import RevisionHistoryMatches
30
from bzrlib.transport import memory
33
class BoundSFTPBranch(tests.TestCaseWithTransport):
36
tests.TestCaseWithTransport.setUp(self)
37
self.vfs_transport_factory = memory.MemoryServer
38
if self.transport_server is test_server.LocalURLServer:
39
self.transport_server = None
41
def create_branches(self):
42
self.build_tree(['base/', 'base/a', 'base/b'])
43
format = controldir.format_registry.make_bzrdir('knit')
45
wt_base = controldir.ControlDir.create_standalone_workingtree(
46
self.get_url('base'), format=format)
47
except errors.NotLocalUrl:
48
raise tests.TestSkipped('Not a local URL')
50
b_base = wt_base.branch
54
wt_base.commit('first', rev_id='r@b-1')
56
wt_child = b_base.bzrdir.sprout('child').open_workingtree()
57
self.sftp_base = branch.Branch.open(self.get_url('base'))
58
wt_child.branch.bind(self.sftp_base)
59
# check the branch histories are ready for using in tests.
60
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
61
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
62
return b_base, wt_child
64
def test_simple_binding(self):
65
self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
67
wt_base = controldir.ControlDir.create_standalone_workingtree(
69
except errors.NotLocalUrl:
70
raise tests.TestSkipped('Not a local URL')
74
wt_base.commit('first', rev_id='r@b-1')
76
b_base = wt_base.branch
77
# manually make a branch we can bind, because the default format
78
# may not be bindable-from, and we want to test the side effects etc
80
format = controldir.format_registry.make_bzrdir('knit')
81
b_child = controldir.ControlDir.create_branch_convenience(
82
'child', format=format)
83
self.assertEqual(None, b_child.get_bound_location())
84
self.assertEqual(None, b_child.get_master_branch())
86
sftp_b_base = branch.Branch.open(self.get_url('base'))
87
b_child.bind(sftp_b_base)
88
self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
89
# the bind must not have given b_child history:
90
self.assertThat(RevisionHistoryMatches([]), b_child)
91
# we should be able to update the branch at this point:
92
self.assertEqual(None, b_child.update())
93
# and now there must be history.
94
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_child)
95
# this line is more of a working tree test line, but - what the hey,
97
b_child.bzrdir.open_workingtree().update()
98
self.assertPathExists('child/a')
99
self.assertPathExists('child/b')
102
self.assertEqual(None, b_child.get_bound_location())
104
def test_bound_commit(self):
105
b_base, wt_child = self.create_branches()
107
with open('child/a', 'wb') as f: f.write('new contents\n')
108
wt_child.commit('modified a', rev_id='r@c-2')
110
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
111
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), b_base)
113
def test_bound_commit_fails_when_out_of_date(self):
114
# Make sure commit fails if out of date.
115
b_base, wt_child = self.create_branches()
117
with open('base/a', 'wb') as f: f.write('new base contents\n')
118
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
120
with open('child/b', 'wb') as f: f.write('new b child contents\n')
121
self.assertRaises(errors.BoundBranchOutOfDate,
122
wt_child.commit, 'child', rev_id='r@c-2')
124
sftp_b_base = branch.Branch.open(self.get_url('base'))
126
# This is all that cmd_update does
127
wt_child.pull(sftp_b_base, overwrite=False)
129
wt_child.commit('child', rev_id='r@c-3')
131
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@c-3']),
133
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@c-3']),
135
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@c-3']),
138
def test_double_binding(self):
139
b_base, wt_child = self.create_branches()
141
wt_child2 = wt_child.branch.create_checkout('child2')
143
with open('child2/a', 'wb') as f: f.write('new contents\n')
144
self.assertRaises(errors.CommitToDoubleBoundBranch,
145
wt_child2.commit, 'child2', rev_id='r@d-2')
147
def test_unbinding(self):
148
b_base, wt_child = self.create_branches()
150
# TestCaseWithSFTPServer only allows you to connect one time
151
# to the SFTP server. So we have to create a connection and
152
# keep it around, so that it can be reused
153
__unused_t = self.get_transport()
155
wt_base = b_base.bzrdir.open_workingtree()
156
with open('base/a', 'wb') as f: f.write('new base contents\n')
157
wt_base.commit('base', rev_id='r@b-2')
159
with open('child/b', 'wb') as f: f.write('new b child contents\n')
160
self.assertRaises(errors.BoundBranchOutOfDate,
161
wt_child.commit, 'child', rev_id='r@c-2')
162
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
163
wt_child.branch.unbind()
164
wt_child.commit('child', rev_id='r@c-2')
165
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
166
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
168
sftp_b_base = branch.Branch.open(self.get_url('base'))
169
self.assertRaises(errors.DivergedBranches,
170
wt_child.branch.bind, sftp_b_base)
172
def test_commit_remote_bound(self):
173
# Make sure it is detected if the current base is bound during the
174
# objects lifetime, when the child goes to commit.
175
b_base, wt_child = self.create_branches()
177
b_base.bzrdir.sprout('newbase')
179
sftp_b_base = branch.Branch.open(self.get_url('base'))
180
sftp_b_newbase = branch.Branch.open(self.get_url('newbase'))
182
sftp_b_base.bind(sftp_b_newbase)
184
with open('child/a', 'wb') as f: f.write('new contents\n')
185
self.assertRaises(errors.CommitToDoubleBoundBranch,
186
wt_child.commit, 'failure', rev_id='r@c-2')
188
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
189
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
190
self.assertThat(RevisionHistoryMatches(['r@b-1']), sftp_b_newbase)
192
def test_bind_diverged(self):
193
b_base, wt_child = self.create_branches()
195
wt_child.branch.unbind()
196
with open('child/a', 'ab') as f: f.write('child contents\n')
197
wt_child_rev = wt_child.commit('child', rev_id='r@c-2')
199
self.assertEqual(RevisionHistoryMatches(['r@b-1', 'r@c-2']),
201
self.assertEqual(RevisionHistoryMatches(['r@b-1']), b_base)
203
with open('base/b', 'ab') as f: f.write('base contents\n')
204
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
205
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
207
sftp_b_base = branch.Branch.open(self.get_url('base'))
209
self.assertRaises(errors.DivergedBranches,
210
wt_child.branch.bind, sftp_b_base)
212
wt_child.merge_from_branch(sftp_b_base)
213
self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
214
wt_child.commit('merged', rev_id='r@c-3')
216
# After a merge, trying to bind again should succeed but not push the
218
wt_child.branch.bind(sftp_b_base)
220
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
221
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2', 'r@c-3']),
224
def test_bind_parent_ahead_preserves_parent(self):
225
b_base, wt_child = self.create_branches()
227
wt_child.branch.unbind()
229
with open('a', 'ab') as f: f.write('base changes\n')
230
wt_base = b_base.bzrdir.open_workingtree()
231
wt_base.commit('base', rev_id='r@b-2')
232
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
233
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
235
sftp_b_base = branch.Branch.open(self.get_url('base'))
236
wt_child.branch.bind(sftp_b_base)
238
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
240
wt_child.branch.unbind()
242
# Check and make sure it also works if parent is ahead multiple
243
wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
244
wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
245
wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
248
RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5']),
251
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
253
wt_child.branch.bind(sftp_b_base)
254
self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
256
def test_bind_child_ahead_preserves_child(self):
257
b_base, wt_child = self.create_branches()
259
wt_child.branch.unbind()
261
wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
262
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
263
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
265
sftp_b_base = branch.Branch.open(self.get_url('base'))
266
wt_child.branch.bind(sftp_b_base)
268
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
270
# Check and make sure it also works if child is ahead multiple
271
wt_child.branch.unbind()
272
wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
273
wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
274
wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
277
RevisionHistoryMatches(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5']),
279
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
281
wt_child.branch.bind(sftp_b_base)
282
self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
284
def test_commit_after_merge(self):
285
b_base, wt_child = self.create_branches()
287
# We want merge to be able to be a local only
288
# operation, because it does not alter the branch data.
290
# But we can't fail afterwards
292
wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
294
with open('other/c', 'wb') as f: f.write('file c\n')
296
wt_other.commit('adding c', rev_id='r@d-2')
298
self.assertFalse(wt_child.branch.repository.has_revision('r@d-2'))
299
self.assertFalse(b_base.repository.has_revision('r@d-2'))
301
wt_child.merge_from_branch(wt_other.branch)
303
self.assertPathExists('child/c')
304
self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
305
self.assertTrue(wt_child.branch.repository.has_revision('r@d-2'))
306
self.assertFalse(b_base.repository.has_revision('r@d-2'))
308
# Commit should succeed, and cause merged revisions to
309
# be pushed into base
310
wt_child.commit('merge other', rev_id='r@c-2')
311
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
312
self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), b_base)
313
self.assertTrue(b_base.repository.has_revision('r@d-2'))
315
def test_commit_fails(self):
316
b_base, wt_child = self.create_branches()
318
with open('a', 'ab') as f: f.write('child adds some text\n')
320
# this deletes the branch from memory
322
# and this moves it out of the way on disk
323
os.rename('base', 'hidden_base')
325
self.assertRaises(errors.BoundBranchConnectionFailure,
326
wt_child.commit, 'added text', rev_id='r@c-2')
328
# TODO: jam 20051231 We need invasive failure tests, so that we can show
329
# performance even when something fails.