1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branches bound to an sftp branch."""
23
from bzrlib.branch import Branch
24
from bzrlib.bzrdir import (BzrDir,
29
import bzrlib.errors as errors
30
from bzrlib.tests.test_sftp_transport import TestCaseWithSFTPServer, paramiko_loaded
33
class BoundSFTPBranch(TestCaseWithSFTPServer):
35
def create_branches(self):
36
self.build_tree(['base/', 'base/a', 'base/b'])
37
old_format = BzrDirFormat.get_default_format()
38
BzrDirFormat.set_default_format(BzrDirMetaFormat1())
40
wt_base = BzrDir.create_standalone_workingtree('base')
42
BzrDirFormat.set_default_format(old_format)
44
b_base = wt_base.branch
48
wt_base.commit('first', rev_id='r@b-1')
50
wt_child = b_base.bzrdir.sprout('child').open_workingtree()
51
self.sftp_base = Branch.open(self.get_url('base'))
52
wt_child.branch.bind(self.sftp_base)
54
self.assertEqual(['r@b-1'], b_base.revision_history())
55
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
57
return b_base, wt_child
59
def test_simple_binding(self):
60
self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
61
wt_base = BzrDir.create_standalone_workingtree('base')
65
wt_base.commit('first', rev_id='r@b-1')
67
b_base = wt_base.branch
68
old_format = BzrDirFormat.get_default_format()
69
BzrDirFormat.set_default_format(BzrDirMetaFormat1())
71
b_child = BzrDir.create_branch_convenience('child')
73
BzrDirFormat.set_default_format(old_format)
74
self.assertEqual(None, b_child.get_bound_location())
75
self.assertEqual(None, b_child.get_master_branch())
77
sftp_b_base = Branch.open(self.get_url('base'))
78
b_child.bind(sftp_b_base)
79
self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
80
# this line is more of a working tree test line, but - what the hey.
81
b_child.bzrdir.open_workingtree().update()
82
self.failUnlessExists('child/a')
83
self.failUnlessExists('child/b')
86
self.assertEqual(None, b_child.get_bound_location())
88
def test_bound_commit(self):
89
b_base, wt_child = self.create_branches()
91
open('child/a', 'wb').write('new contents\n')
92
wt_child.commit('modified a', rev_id='r@c-2')
94
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
95
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
97
def test_bound_fail(self):
98
# Make sure commit fails if out of date.
99
b_base, wt_child = self.create_branches()
101
open('base/a', 'wb').write('new base contents\n')
102
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
104
open('child/b', 'wb').write('new b child contents\n')
105
self.assertRaises(errors.BoundBranchOutOfDate,
106
wt_child.commit, 'child', rev_id='r@c-2')
108
sftp_b_base = Branch.open(self.get_url('base'))
110
# This is all that cmd_update does
111
wt_child.pull(sftp_b_base, overwrite=False)
113
wt_child.commit('child', rev_id='r@c-3')
115
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
116
wt_child.branch.revision_history())
117
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
118
b_base.revision_history())
119
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
120
sftp_b_base.revision_history())
122
def test_double_binding(self):
123
b_base, wt_child = self.create_branches()
125
wt_child2 = wt_child.bzrdir.sprout('child2').open_workingtree()
127
wt_child2.branch.bind(wt_child.branch)
129
open('child2/a', 'wb').write('new contents\n')
130
self.assertRaises(errors.CommitToDoubleBoundBranch,
131
wt_child2.commit, 'child2', rev_id='r@d-2')
133
def test_unbinding(self):
134
from bzrlib.transport import get_transport
135
b_base, wt_child = self.create_branches()
137
# TestCaseWithSFTPServer only allows you to connect one time
138
# to the SFTP server. So we have to create a connection and
139
# keep it around, so that it can be reused
140
__unused_t = get_transport(self.get_url('.'))
142
wt_base = b_base.bzrdir.open_workingtree()
143
open('base/a', 'wb').write('new base contents\n')
144
wt_base.commit('base', rev_id='r@b-2')
146
open('child/b', 'wb').write('new b child contents\n')
147
self.assertRaises(errors.BoundBranchOutOfDate,
148
wt_child.commit, 'child', rev_id='r@c-2')
149
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
150
wt_child.branch.unbind()
151
wt_child.commit('child', rev_id='r@c-2')
152
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
153
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
155
sftp_b_base = Branch.open(self.get_url('base'))
156
self.assertRaises(errors.DivergedBranches,
157
wt_child.branch.bind, sftp_b_base)
159
def test_commit_remote_bound(self):
160
# Make sure it is detected if the current base
161
# suddenly is bound when child goes to commit
162
b_base, wt_child = self.create_branches()
164
b_base.bzrdir.sprout('newbase')
166
sftp_b_base = Branch.open(self.get_url('base'))
167
sftp_b_newbase = Branch.open(self.get_url('newbase'))
169
sftp_b_base.bind(sftp_b_newbase)
171
open('child/a', 'wb').write('new contents\n')
172
self.assertRaises(errors.CommitToDoubleBoundBranch,
173
wt_child.commit, 'failure', rev_id='r@c-2')
175
self.assertEqual(['r@b-1'], b_base.revision_history())
176
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
177
self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
179
def test_pull_updates_both(self):
180
b_base, wt_child = self.create_branches()
182
wt_newchild = b_base.bzrdir.sprout('newchild').open_workingtree()
183
open('newchild/b', 'wb').write('newchild b contents\n')
184
wt_newchild.commit('newchild', rev_id='r@d-2')
185
self.assertEqual(['r@b-1', 'r@d-2'], wt_newchild.branch.revision_history())
187
wt_child.pull(wt_newchild.branch)
188
self.assertEqual(['r@b-1', 'r@d-2'], wt_child.branch.revision_history())
189
self.assertEqual(['r@b-1', 'r@d-2'], b_base.revision_history())
191
def test_bind_diverged(self):
192
from bzrlib.builtins import merge
194
b_base, wt_child = self.create_branches()
196
wt_child.branch.unbind()
197
open('child/a', 'ab').write('child contents\n')
198
wt_child.commit('child', rev_id='r@c-2')
200
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
201
self.assertEqual(['r@b-1'], b_base.revision_history())
203
open('base/b', 'ab').write('base contents\n')
204
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
205
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
207
sftp_b_base = Branch.open(self.get_url('base'))
209
self.assertRaises(errors.DivergedBranches,
210
wt_child.branch.bind, sftp_b_base)
212
# TODO: jam 20051230 merge_inner doesn't set pending merges
213
# Is this on purpose?
214
# merge_inner also doesn't fetch any missing revisions
215
#merge_inner(wt_child.branch, sftp_b_base.revision_tree('r@b-2'),
216
# wt_child.branch.revision_tree('r@b-1'))
217
# TODO: jam 20051230 merge(..., (None, None), ...) seems to
218
# cause an infinite loop of some sort. It definitely doesn't
219
# work, you have to use list notation
220
merge((sftp_b_base.base, 2), [None, None], this_dir=wt_child.branch.base)
222
self.assertEqual(['r@b-2'], wt_child.pending_merges())
223
wt_child.commit('merged', rev_id='r@c-3')
225
# After a merge, trying to bind again should succeed
226
# by pushing the new change to base
227
wt_child.branch.bind(sftp_b_base)
229
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
230
b_base.revision_history())
231
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
232
wt_child.branch.revision_history())
234
def test_bind_parent_ahead(self):
235
b_base, wt_child = self.create_branches()
237
wt_child.branch.unbind()
239
open('a', 'ab').write('base changes\n')
240
wt_base = b_base.bzrdir.open_workingtree()
241
wt_base.commit('base', rev_id='r@b-2')
242
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
243
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
245
sftp_b_base = Branch.open(self.get_url('base'))
246
wt_child.branch.bind(sftp_b_base)
248
self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
250
wt_child.branch.unbind()
252
# Check and make sure it also works if parent is ahead multiple
253
wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
254
wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
255
wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
257
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
258
b_base.revision_history())
260
self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
262
wt_child.branch.bind(sftp_b_base)
263
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
264
wt_child.branch.revision_history())
266
def test_bind_child_ahead(self):
267
b_base, wt_child = self.create_branches()
269
wt_child.branch.unbind()
271
wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
272
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
273
self.assertEqual(['r@b-1'], b_base.revision_history())
275
sftp_b_base = Branch.open(self.get_url('base'))
276
wt_child.branch.bind(sftp_b_base)
278
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
280
# Check and make sure it also works if child is ahead multiple
281
wt_child.branch.unbind()
282
wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
283
wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
284
wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
286
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
287
wt_child.branch.revision_history())
288
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
290
wt_child.branch.bind(sftp_b_base)
291
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
292
b_base.revision_history())
294
def test_commit_after_merge(self):
295
from bzrlib.builtins import merge
297
b_base, wt_child = self.create_branches()
299
# We want merge to be able to be a local only
300
# operation, because it does not alter the branch data.
302
# But we can't fail afterwards
304
wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
306
open('other/c', 'wb').write('file c\n')
308
wt_other.commit('adding c', rev_id='r@d-2')
310
self.failIf(wt_child.branch.repository.has_revision('r@d-2'))
311
self.failIf(b_base.repository.has_revision('r@d-2'))
313
# TODO: jam 20051230 merge_inner doesn't set pending merges
314
# Is this on purpose?
315
# merge_inner also doesn't fetch any missing revisions
316
#merge_inner(wt_child.branch, b_other.revision_tree('r@d-2'),
317
# wt_child.branch.revision_tree('r@b-1'))
318
merge((wt_other.branch.base, 2), [None, None], this_dir=wt_child.branch.base)
320
self.failUnlessExists('child/c')
321
self.assertEqual(['r@d-2'], wt_child.pending_merges())
322
self.failUnless(wt_child.branch.repository.has_revision('r@d-2'))
323
self.failIf(b_base.repository.has_revision('r@d-2'))
325
# Commit should succeed, and cause merged revisions to
326
# be pulled into base
327
wt_child.commit('merge other', rev_id='r@c-2')
328
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
329
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
330
self.failUnless(b_base.repository.has_revision('r@d-2'))
332
def test_commit_fails(self):
333
b_base, wt_child = self.create_branches()
335
open('a', 'ab').write('child adds some text\n')
338
os.rename('base', 'hidden_base')
340
self.assertRaises(errors.BoundBranchConnectionFailure,
341
wt_child.commit, 'added text', rev_id='r@c-2')
343
def test_pull_fails(self):
344
b_base, wt_child = self.create_branches()
346
wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
347
open('other/a', 'wb').write('new contents\n')
348
wt_other.commit('changed a', rev_id='r@d-2')
350
self.assertEqual(['r@b-1'], b_base.revision_history())
351
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
352
self.assertEqual(['r@b-1', 'r@d-2'], wt_other.branch.revision_history())
355
os.rename('base', 'hidden_base')
357
self.assertRaises(errors.BoundBranchConnectionFailure,
358
wt_child.pull, wt_other.branch)
360
# TODO: jam 20051231 We need invasive failure tests, so that we can show
361
# performance even when something fails.
364
if not paramiko_loaded: