~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_branch/test_bound_sftp.py.THIS

  • Committer: Patch Queue Manager
  • Date: 2012-04-02 02:36:45 UTC
  • mfrom: (6471.1.7 iter-child-entries)
  • Revision ID: pqm@pqm.ubuntu.com-20120402023645-28dkcb0awh3i1flc
(jelmer) Add Tree.iter_child_entries(). (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2009, 2010 Robey Pointer <robey@lag.net>, Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for branches bound to an sftp branch."""
18
 
 
19
 
 
20
 
import os
21
 
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    controldir,
25
 
    errors,
26
 
    tests,
27
 
    )
28
 
from bzrlib.tests import test_server
29
 
from bzrlib.tests.matchers import RevisionHistoryMatches
30
 
from bzrlib.transport import memory
31
 
 
32
 
 
33
 
class BoundSFTPBranch(tests.TestCaseWithTransport):
34
 
 
35
 
    def setUp(self):
36
 
        tests.TestCaseWithTransport.setUp(self)
37
 
        self.vfs_transport_factory = memory.MemoryServer
38
 
        if self.transport_server is test_server.LocalURLServer:
39
 
            self.transport_server = None
40
 
 
41
 
    def create_branches(self):
42
 
        self.build_tree(['base/', 'base/a', 'base/b'])
43
 
        format = controldir.format_registry.make_bzrdir('knit')
44
 
        try:
45
 
            wt_base = controldir.ControlDir.create_standalone_workingtree(
46
 
                self.get_url('base'), format=format)
47
 
        except errors.NotLocalUrl:
48
 
            raise tests.TestSkipped('Not a local URL')
49
 
 
50
 
        b_base = wt_base.branch
51
 
 
52
 
        wt_base.add('a')
53
 
        wt_base.add('b')
54
 
        wt_base.commit('first', rev_id='r@b-1')
55
 
 
56
 
        wt_child = b_base.bzrdir.sprout('child').open_workingtree()
57
 
        self.sftp_base = branch.Branch.open(self.get_url('base'))
58
 
        wt_child.branch.bind(self.sftp_base)
59
 
        # check the branch histories are ready for using in tests.
60
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
61
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
62
 
        return b_base, wt_child
63
 
 
64
 
    def test_simple_binding(self):
65
 
        self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
66
 
        try:
67
 
            wt_base = controldir.ControlDir.create_standalone_workingtree(
68
 
                self.get_url('base'))
69
 
        except errors.NotLocalUrl:
70
 
            raise tests.TestSkipped('Not a local URL')
71
 
 
72
 
        wt_base.add('a')
73
 
        wt_base.add('b')
74
 
        wt_base.commit('first', rev_id='r@b-1')
75
 
 
76
 
        b_base = wt_base.branch
77
 
        # manually make a branch we can bind, because the default format
78
 
        # may not be bindable-from, and we want to test the side effects etc
79
 
        # of bondage.
80
 
        format = controldir.format_registry.make_bzrdir('knit')
81
 
        b_child = controldir.ControlDir.create_branch_convenience(
82
 
            'child', format=format)
83
 
        self.assertEqual(None, b_child.get_bound_location())
84
 
        self.assertEqual(None, b_child.get_master_branch())
85
 
 
86
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
87
 
        b_child.bind(sftp_b_base)
88
 
        self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
89
 
        # the bind must not have given b_child history:
90
 
        self.assertThat(RevisionHistoryMatches([]), b_child)
91
 
        # we should be able to update the branch at this point:
92
 
        self.assertEqual(None, b_child.update())
93
 
        # and now there must be history.
94
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_child)
95
 
        # this line is more of a working tree test line, but - what the hey,
96
 
        # it has work to do.
97
 
        b_child.bzrdir.open_workingtree().update()
98
 
        self.assertPathExists('child/a')
99
 
        self.assertPathExists('child/b')
100
 
 
101
 
        b_child.unbind()
102
 
        self.assertEqual(None, b_child.get_bound_location())
103
 
 
104
 
    def test_bound_commit(self):
105
 
        b_base, wt_child = self.create_branches()
106
 
 
107
 
        with open('child/a', 'wb') as f: f.write('new contents\n')
108
 
        wt_child.commit('modified a', rev_id='r@c-2')
109
 
 
110
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
111
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), b_base)
112
 
 
113
 
    def test_bound_commit_fails_when_out_of_date(self):
114
 
        # Make sure commit fails if out of date.
115
 
        b_base, wt_child = self.create_branches()
116
 
 
117
 
        with open('base/a', 'wb') as f: f.write('new base contents\n')
118
 
        b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
119
 
 
120
 
        with open('child/b', 'wb') as f: f.write('new b child contents\n')
121
 
        self.assertRaises(errors.BoundBranchOutOfDate,
122
 
                wt_child.commit, 'child', rev_id='r@c-2')
123
 
 
124
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
125
 
 
126
 
        # This is all that cmd_update does
127
 
        wt_child.pull(sftp_b_base, overwrite=False)
128
 
 
129
 
        wt_child.commit('child', rev_id='r@c-3')
130
 
 
131
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@c-3']),
132
 
                wt_child.branch)
133
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@c-3']),
134
 
                b_base)
135
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@c-3']),
136
 
                sftp_b_base)
137
 
 
138
 
    def test_double_binding(self):
139
 
        b_base, wt_child = self.create_branches()
140
 
 
141
 
        wt_child2 = wt_child.branch.create_checkout('child2')
142
 
 
143
 
        with open('child2/a', 'wb') as f: f.write('new contents\n')
144
 
        self.assertRaises(errors.CommitToDoubleBoundBranch,
145
 
                wt_child2.commit, 'child2', rev_id='r@d-2')
146
 
 
147
 
    def test_unbinding(self):
148
 
        b_base, wt_child = self.create_branches()
149
 
 
150
 
        # TestCaseWithSFTPServer only allows you to connect one time
151
 
        # to the SFTP server. So we have to create a connection and
152
 
        # keep it around, so that it can be reused
153
 
        __unused_t = self.get_transport()
154
 
 
155
 
        wt_base = b_base.bzrdir.open_workingtree()
156
 
        with open('base/a', 'wb') as f: f.write('new base contents\n')
157
 
        wt_base.commit('base', rev_id='r@b-2')
158
 
 
159
 
        with open('child/b', 'wb') as f: f.write('new b child contents\n')
160
 
        self.assertRaises(errors.BoundBranchOutOfDate,
161
 
                wt_child.commit, 'child', rev_id='r@c-2')
162
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
163
 
        wt_child.branch.unbind()
164
 
        wt_child.commit('child', rev_id='r@c-2')
165
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
166
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
167
 
 
168
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
169
 
        self.assertRaises(errors.DivergedBranches,
170
 
                wt_child.branch.bind, sftp_b_base)
171
 
 
172
 
    def test_commit_remote_bound(self):
173
 
        # Make sure it is detected if the current base is bound during the
174
 
        # objects lifetime, when the child goes to commit.
175
 
        b_base, wt_child = self.create_branches()
176
 
 
177
 
        b_base.bzrdir.sprout('newbase')
178
 
 
179
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
180
 
        sftp_b_newbase = branch.Branch.open(self.get_url('newbase'))
181
 
 
182
 
        sftp_b_base.bind(sftp_b_newbase)
183
 
 
184
 
        with open('child/a', 'wb') as f: f.write('new contents\n')
185
 
        self.assertRaises(errors.CommitToDoubleBoundBranch,
186
 
                wt_child.commit, 'failure', rev_id='r@c-2')
187
 
 
188
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
189
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
190
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), sftp_b_newbase)
191
 
 
192
 
    def test_bind_diverged(self):
193
 
        b_base, wt_child = self.create_branches()
194
 
 
195
 
        wt_child.branch.unbind()
196
 
        with open('child/a', 'ab') as f: f.write('child contents\n')
197
 
        wt_child_rev = wt_child.commit('child', rev_id='r@c-2')
198
 
 
199
 
        self.assertEqual(RevisionHistoryMatches(['r@b-1', 'r@c-2']),
200
 
            wt_child.branch)
201
 
        self.assertEqual(RevisionHistoryMatches(['r@b-1']), b_base)
202
 
 
203
 
        with open('base/b', 'ab') as f: f.write('base contents\n')
204
 
        b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
205
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
206
 
 
207
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
208
 
 
209
 
        self.assertRaises(errors.DivergedBranches,
210
 
                wt_child.branch.bind, sftp_b_base)
211
 
 
212
 
        wt_child.merge_from_branch(sftp_b_base)
213
 
        self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
214
 
        wt_child.commit('merged', rev_id='r@c-3')
215
 
 
216
 
        # After a merge, trying to bind again should succeed but not push the
217
 
        # new change.
218
 
        wt_child.branch.bind(sftp_b_base)
219
 
 
220
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
221
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2', 'r@c-3']),
222
 
            wt_child.branch)
223
 
 
224
 
    def test_bind_parent_ahead_preserves_parent(self):
225
 
        b_base, wt_child = self.create_branches()
226
 
 
227
 
        wt_child.branch.unbind()
228
 
 
229
 
        with open('a', 'ab') as f: f.write('base changes\n')
230
 
        wt_base = b_base.bzrdir.open_workingtree()
231
 
        wt_base.commit('base', rev_id='r@b-2')
232
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@b-2']), b_base)
233
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
234
 
 
235
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
236
 
        wt_child.branch.bind(sftp_b_base)
237
 
 
238
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
239
 
 
240
 
        wt_child.branch.unbind()
241
 
 
242
 
        # Check and make sure it also works if parent is ahead multiple
243
 
        wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
244
 
        wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
245
 
        wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
246
 
 
247
 
        self.assertThat(
248
 
            RevisionHistoryMatches(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5']),
249
 
            b_base)
250
 
 
251
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
252
 
 
253
 
        wt_child.branch.bind(sftp_b_base)
254
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), wt_child.branch)
255
 
 
256
 
    def test_bind_child_ahead_preserves_child(self):
257
 
        b_base, wt_child = self.create_branches()
258
 
 
259
 
        wt_child.branch.unbind()
260
 
 
261
 
        wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
262
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
263
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
264
 
 
265
 
        sftp_b_base = branch.Branch.open(self.get_url('base'))
266
 
        wt_child.branch.bind(sftp_b_base)
267
 
 
268
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
269
 
 
270
 
        # Check and make sure it also works if child is ahead multiple
271
 
        wt_child.branch.unbind()
272
 
        wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
273
 
        wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
274
 
        wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
275
 
 
276
 
        self.assertThat(
277
 
            RevisionHistoryMatches(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5']),
278
 
            wt_child.branch)
279
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
280
 
 
281
 
        wt_child.branch.bind(sftp_b_base)
282
 
        self.assertThat(RevisionHistoryMatches(['r@b-1']), b_base)
283
 
 
284
 
    def test_commit_after_merge(self):
285
 
        b_base, wt_child = self.create_branches()
286
 
 
287
 
        # We want merge to be able to be a local only
288
 
        # operation, because it does not alter the branch data.
289
 
 
290
 
        # But we can't fail afterwards
291
 
 
292
 
        wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
293
 
 
294
 
        with open('other/c', 'wb') as f: f.write('file c\n')
295
 
        wt_other.add('c')
296
 
        wt_other.commit('adding c', rev_id='r@d-2')
297
 
 
298
 
        self.assertFalse(wt_child.branch.repository.has_revision('r@d-2'))
299
 
        self.assertFalse(b_base.repository.has_revision('r@d-2'))
300
 
 
301
 
        wt_child.merge_from_branch(wt_other.branch)
302
 
 
303
 
        self.assertPathExists('child/c')
304
 
        self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
305
 
        self.assertTrue(wt_child.branch.repository.has_revision('r@d-2'))
306
 
        self.assertFalse(b_base.repository.has_revision('r@d-2'))
307
 
 
308
 
        # Commit should succeed, and cause merged revisions to
309
 
        # be pushed into base
310
 
        wt_child.commit('merge other', rev_id='r@c-2')
311
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), wt_child.branch)
312
 
        self.assertThat(RevisionHistoryMatches(['r@b-1', 'r@c-2']), b_base)
313
 
        self.assertTrue(b_base.repository.has_revision('r@d-2'))
314
 
 
315
 
    def test_commit_fails(self):
316
 
        b_base, wt_child = self.create_branches()
317
 
 
318
 
        with open('a', 'ab') as f: f.write('child adds some text\n')
319
 
 
320
 
        # this deletes the branch from memory
321
 
        del b_base
322
 
        # and this moves it out of the way on disk
323
 
        os.rename('base', 'hidden_base')
324
 
 
325
 
        self.assertRaises(errors.BoundBranchConnectionFailure,
326
 
                wt_child.commit, 'added text', rev_id='r@c-2')
327
 
 
328
 
    # TODO: jam 20051231 We need invasive failure tests, so that we can show
329
 
    #       performance even when something fails.
330
 
 
331