~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_bound_sftp.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for branches bound to an sftp branch."""
 
18
 
 
19
 
 
20
import os
 
21
 
 
22
 
 
23
from bzrlib.branch import Branch
 
24
from bzrlib.bzrdir import (BzrDir,
 
25
                           BzrDirFormat,
 
26
                           BzrDirFormat6,
 
27
                           BzrDirMetaFormat1,
 
28
                           )
 
29
import bzrlib.errors as errors
 
30
from bzrlib.tests.test_sftp_transport import TestCaseWithSFTPServer, paramiko_loaded
 
31
 
 
32
 
 
33
class BoundSFTPBranch(TestCaseWithSFTPServer):
 
34
 
 
35
    def create_branches(self):
 
36
        self.build_tree(['base/', 'base/a', 'base/b'])
 
37
        old_format = BzrDirFormat.get_default_format()
 
38
        BzrDirFormat.set_default_format(BzrDirMetaFormat1())
 
39
        try:
 
40
            wt_base = BzrDir.create_standalone_workingtree('base')
 
41
        finally:
 
42
            BzrDirFormat.set_default_format(old_format)
 
43
    
 
44
        b_base = wt_base.branch
 
45
 
 
46
        wt_base.add('a')
 
47
        wt_base.add('b')
 
48
        wt_base.commit('first', rev_id='r@b-1')
 
49
 
 
50
        wt_child = b_base.bzrdir.sprout('child').open_workingtree()
 
51
        self.sftp_base = Branch.open(self.get_url('base'))
 
52
        wt_child.branch.bind(self.sftp_base)
 
53
 
 
54
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
55
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
56
 
 
57
        return b_base, wt_child
 
58
 
 
59
    def test_simple_binding(self):
 
60
        self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
 
61
        wt_base = BzrDir.create_standalone_workingtree('base')
 
62
 
 
63
        wt_base.add('a')
 
64
        wt_base.add('b')
 
65
        wt_base.commit('first', rev_id='r@b-1')
 
66
 
 
67
        b_base = wt_base.branch
 
68
        old_format = BzrDirFormat.get_default_format()
 
69
        BzrDirFormat.set_default_format(BzrDirMetaFormat1())
 
70
        try:
 
71
            b_child = BzrDir.create_branch_convenience('child')
 
72
        finally:
 
73
            BzrDirFormat.set_default_format(old_format)
 
74
        self.assertEqual(None, b_child.get_bound_location())
 
75
        self.assertEqual(None, b_child.get_master_branch())
 
76
 
 
77
        sftp_b_base = Branch.open(self.get_url('base'))
 
78
        b_child.bind(sftp_b_base)
 
79
        self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
 
80
        # this line is more of a working tree test line, but - what the hey.
 
81
        b_child.bzrdir.open_workingtree().update()
 
82
        self.failUnlessExists('child/a')
 
83
        self.failUnlessExists('child/b')
 
84
 
 
85
        b_child.unbind()
 
86
        self.assertEqual(None, b_child.get_bound_location())
 
87
 
 
88
    def test_bound_commit(self):
 
89
        b_base, wt_child = self.create_branches()
 
90
 
 
91
        open('child/a', 'wb').write('new contents\n')
 
92
        wt_child.commit('modified a', rev_id='r@c-2')
 
93
 
 
94
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
95
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
96
 
 
97
    def test_bound_fail(self):
 
98
        # Make sure commit fails if out of date.
 
99
        b_base, wt_child = self.create_branches()
 
100
 
 
101
        open('base/a', 'wb').write('new base contents\n')
 
102
        b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
 
103
 
 
104
        open('child/b', 'wb').write('new b child contents\n')
 
105
        self.assertRaises(errors.BoundBranchOutOfDate,
 
106
                wt_child.commit, 'child', rev_id='r@c-2')
 
107
 
 
108
        sftp_b_base = Branch.open(self.get_url('base'))
 
109
 
 
110
        # This is all that cmd_update does
 
111
        wt_child.pull(sftp_b_base, overwrite=False)
 
112
 
 
113
        wt_child.commit('child', rev_id='r@c-3')
 
114
 
 
115
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
116
                wt_child.branch.revision_history())
 
117
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
118
                b_base.revision_history())
 
119
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
120
                sftp_b_base.revision_history())
 
121
 
 
122
    def test_double_binding(self):
 
123
        b_base, wt_child = self.create_branches()
 
124
 
 
125
        wt_child2 = wt_child.bzrdir.sprout('child2').open_workingtree()
 
126
 
 
127
        wt_child2.branch.bind(wt_child.branch)
 
128
 
 
129
        open('child2/a', 'wb').write('new contents\n')
 
130
        self.assertRaises(errors.CommitToDoubleBoundBranch,
 
131
                wt_child2.commit, 'child2', rev_id='r@d-2')
 
132
 
 
133
    def test_unbinding(self):
 
134
        from bzrlib.transport import get_transport
 
135
        b_base, wt_child = self.create_branches()
 
136
 
 
137
        # TestCaseWithSFTPServer only allows you to connect one time
 
138
        # to the SFTP server. So we have to create a connection and
 
139
        # keep it around, so that it can be reused
 
140
        __unused_t = get_transport(self.get_url('.'))
 
141
 
 
142
        wt_base = b_base.bzrdir.open_workingtree()
 
143
        open('base/a', 'wb').write('new base contents\n')
 
144
        wt_base.commit('base', rev_id='r@b-2')
 
145
 
 
146
        open('child/b', 'wb').write('new b child contents\n')
 
147
        self.assertRaises(errors.BoundBranchOutOfDate,
 
148
                wt_child.commit, 'child', rev_id='r@c-2')
 
149
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
150
        wt_child.branch.unbind()
 
151
        wt_child.commit('child', rev_id='r@c-2')
 
152
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
153
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
154
 
 
155
        sftp_b_base = Branch.open(self.get_url('base'))
 
156
        self.assertRaises(errors.DivergedBranches,
 
157
                wt_child.branch.bind, sftp_b_base)
 
158
 
 
159
    def test_commit_remote_bound(self):
 
160
        # Make sure it is detected if the current base
 
161
        # suddenly is bound when child goes to commit
 
162
        b_base, wt_child = self.create_branches()
 
163
 
 
164
        b_base.bzrdir.sprout('newbase')
 
165
 
 
166
        sftp_b_base = Branch.open(self.get_url('base'))
 
167
        sftp_b_newbase = Branch.open(self.get_url('newbase'))
 
168
 
 
169
        sftp_b_base.bind(sftp_b_newbase)
 
170
 
 
171
        open('child/a', 'wb').write('new contents\n')
 
172
        self.assertRaises(errors.CommitToDoubleBoundBranch,
 
173
                wt_child.commit, 'failure', rev_id='r@c-2')
 
174
 
 
175
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
176
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
177
        self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
 
178
 
 
179
    def test_pull_updates_both(self):
 
180
        b_base, wt_child = self.create_branches()
 
181
 
 
182
        wt_newchild = b_base.bzrdir.sprout('newchild').open_workingtree()
 
183
        open('newchild/b', 'wb').write('newchild b contents\n')
 
184
        wt_newchild.commit('newchild', rev_id='r@d-2')
 
185
        self.assertEqual(['r@b-1', 'r@d-2'], wt_newchild.branch.revision_history())
 
186
 
 
187
        wt_child.pull(wt_newchild.branch)
 
188
        self.assertEqual(['r@b-1', 'r@d-2'], wt_child.branch.revision_history())
 
189
        self.assertEqual(['r@b-1', 'r@d-2'], b_base.revision_history())
 
190
 
 
191
    def test_bind_diverged(self):
 
192
        from bzrlib.builtins import merge
 
193
 
 
194
        b_base, wt_child = self.create_branches()
 
195
 
 
196
        wt_child.branch.unbind()
 
197
        open('child/a', 'ab').write('child contents\n')
 
198
        wt_child.commit('child', rev_id='r@c-2')
 
199
 
 
200
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
201
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
202
 
 
203
        open('base/b', 'ab').write('base contents\n')
 
204
        b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
 
205
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
206
 
 
207
        sftp_b_base = Branch.open(self.get_url('base'))
 
208
 
 
209
        self.assertRaises(errors.DivergedBranches,
 
210
                wt_child.branch.bind, sftp_b_base)
 
211
 
 
212
        # TODO: jam 20051230 merge_inner doesn't set pending merges
 
213
        #       Is this on purpose?
 
214
        #       merge_inner also doesn't fetch any missing revisions
 
215
        #merge_inner(wt_child.branch, sftp_b_base.revision_tree('r@b-2'), 
 
216
        #        wt_child.branch.revision_tree('r@b-1'))
 
217
        # TODO: jam 20051230 merge(..., (None, None), ...) seems to
 
218
        #       cause an infinite loop of some sort. It definitely doesn't
 
219
        #       work, you have to use list notation
 
220
        merge((sftp_b_base.base, 2), [None, None], this_dir=wt_child.branch.base)
 
221
 
 
222
        self.assertEqual(['r@b-2'], wt_child.pending_merges())
 
223
        wt_child.commit('merged', rev_id='r@c-3')
 
224
 
 
225
        # After a merge, trying to bind again should succeed
 
226
        # by pushing the new change to base
 
227
        wt_child.branch.bind(sftp_b_base)
 
228
 
 
229
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
230
                b_base.revision_history())
 
231
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
232
                wt_child.branch.revision_history())
 
233
 
 
234
    def test_bind_parent_ahead(self):
 
235
        b_base, wt_child = self.create_branches()
 
236
 
 
237
        wt_child.branch.unbind()
 
238
 
 
239
        open('a', 'ab').write('base changes\n')
 
240
        wt_base = b_base.bzrdir.open_workingtree()
 
241
        wt_base.commit('base', rev_id='r@b-2')
 
242
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
243
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
244
 
 
245
        sftp_b_base = Branch.open(self.get_url('base'))
 
246
        wt_child.branch.bind(sftp_b_base)
 
247
 
 
248
        self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
 
249
 
 
250
        wt_child.branch.unbind()
 
251
 
 
252
        # Check and make sure it also works if parent is ahead multiple
 
253
        wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
 
254
        wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
 
255
        wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
 
256
 
 
257
        self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
 
258
                b_base.revision_history())
 
259
 
 
260
        self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
 
261
 
 
262
        wt_child.branch.bind(sftp_b_base)
 
263
        self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
 
264
                wt_child.branch.revision_history())
 
265
 
 
266
    def test_bind_child_ahead(self):
 
267
        b_base, wt_child = self.create_branches()
 
268
 
 
269
        wt_child.branch.unbind()
 
270
 
 
271
        wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
 
272
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
273
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
274
 
 
275
        sftp_b_base = Branch.open(self.get_url('base'))
 
276
        wt_child.branch.bind(sftp_b_base)
 
277
 
 
278
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
279
 
 
280
        # Check and make sure it also works if child is ahead multiple
 
281
        wt_child.branch.unbind()
 
282
        wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
 
283
        wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
 
284
        wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
 
285
 
 
286
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
 
287
                wt_child.branch.revision_history())
 
288
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
289
 
 
290
        wt_child.branch.bind(sftp_b_base)
 
291
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
 
292
                b_base.revision_history())
 
293
 
 
294
    def test_commit_after_merge(self):
 
295
        from bzrlib.builtins import merge
 
296
 
 
297
        b_base, wt_child = self.create_branches()
 
298
 
 
299
        # We want merge to be able to be a local only
 
300
        # operation, because it does not alter the branch data.
 
301
 
 
302
        # But we can't fail afterwards
 
303
 
 
304
        wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
 
305
 
 
306
        open('other/c', 'wb').write('file c\n')
 
307
        wt_other.add('c')
 
308
        wt_other.commit('adding c', rev_id='r@d-2')
 
309
 
 
310
        self.failIf(wt_child.branch.repository.has_revision('r@d-2'))
 
311
        self.failIf(b_base.repository.has_revision('r@d-2'))
 
312
 
 
313
        # TODO: jam 20051230 merge_inner doesn't set pending merges
 
314
        #       Is this on purpose?
 
315
        #       merge_inner also doesn't fetch any missing revisions
 
316
        #merge_inner(wt_child.branch, b_other.revision_tree('r@d-2'),
 
317
        #        wt_child.branch.revision_tree('r@b-1'))
 
318
        merge((wt_other.branch.base, 2), [None, None], this_dir=wt_child.branch.base)
 
319
 
 
320
        self.failUnlessExists('child/c')
 
321
        self.assertEqual(['r@d-2'], wt_child.pending_merges())
 
322
        self.failUnless(wt_child.branch.repository.has_revision('r@d-2'))
 
323
        self.failIf(b_base.repository.has_revision('r@d-2'))
 
324
 
 
325
        # Commit should succeed, and cause merged revisions to
 
326
        # be pulled into base
 
327
        wt_child.commit('merge other', rev_id='r@c-2')
 
328
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
329
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
330
        self.failUnless(b_base.repository.has_revision('r@d-2'))
 
331
 
 
332
    def test_commit_fails(self):
 
333
        b_base, wt_child = self.create_branches()
 
334
 
 
335
        open('a', 'ab').write('child adds some text\n')
 
336
 
 
337
        del b_base
 
338
        os.rename('base', 'hidden_base')
 
339
 
 
340
        self.assertRaises(errors.BoundBranchConnectionFailure,
 
341
                wt_child.commit, 'added text', rev_id='r@c-2')
 
342
 
 
343
    def test_pull_fails(self):
 
344
        b_base, wt_child = self.create_branches()
 
345
 
 
346
        wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
 
347
        open('other/a', 'wb').write('new contents\n')
 
348
        wt_other.commit('changed a', rev_id='r@d-2')
 
349
 
 
350
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
351
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
352
        self.assertEqual(['r@b-1', 'r@d-2'], wt_other.branch.revision_history())
 
353
 
 
354
        del b_base
 
355
        os.rename('base', 'hidden_base')
 
356
 
 
357
        self.assertRaises(errors.BoundBranchConnectionFailure,
 
358
                wt_child.pull, wt_other.branch)
 
359
 
 
360
    # TODO: jam 20051231 We need invasive failure tests, so that we can show
 
361
    #       performance even when something fails.
 
362
 
 
363
 
 
364
if not paramiko_loaded:
 
365
    del BoundSFTPBranch
 
366