1
# Copyright (C) 2005, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""Black-box tests for bzr push."""
33
from bzrlib.repofmt import knitrepo
34
from bzrlib.tests import http_server
35
from bzrlib.transport import memory
38
class TestPush(tests.TestCaseWithTransport):
40
def test_push_error_on_vfs_http(self):
41
""" pushing a branch to a HTTP server fails cleanly. """
42
# the trunk is published on a web server
43
self.transport_readonly_server = http_server.HttpServer
44
self.make_branch('source')
45
public_url = self.get_readonly_url('target')
46
self.run_bzr_error(['http does not support mkdir'],
50
def test_push_remember(self):
51
"""Push changes from one branch to another and test push location."""
52
transport = self.get_transport()
53
tree_a = self.make_branch_and_tree('branch_a')
54
branch_a = tree_a.branch
55
self.build_tree(['branch_a/a'])
57
tree_a.commit('commit a')
58
tree_b = branch_a.bzrdir.sprout('branch_b').open_workingtree()
59
branch_b = tree_b.branch
60
tree_c = branch_a.bzrdir.sprout('branch_c').open_workingtree()
61
branch_c = tree_c.branch
62
self.build_tree(['branch_a/b'])
64
tree_a.commit('commit b')
65
self.build_tree(['branch_b/c'])
67
tree_b.commit('commit c')
68
# initial push location must be empty
69
self.assertEqual(None, branch_b.get_push_location())
71
# test push for failure without push location set
72
out = self.run_bzr('push', working_dir='branch_a', retcode=3)
73
self.assertEquals(out,
74
('','bzr: ERROR: No push location known or specified.\n'))
76
# test not remembered if cannot actually push
77
self.run_bzr('push path/which/doesnt/exist',
78
working_dir='branch_a', retcode=3)
79
out = self.run_bzr('push', working_dir='branch_a', retcode=3)
81
('', 'bzr: ERROR: No push location known or specified.\n'),
84
# test implicit --remember when no push location set, push fails
85
out = self.run_bzr('push ../branch_b',
86
working_dir='branch_a', retcode=3)
87
self.assertEquals(out,
88
('','bzr: ERROR: These branches have diverged. '
89
'Try using "merge" and then "push".\n'))
90
self.assertEquals(osutils.abspath(branch_a.get_push_location()),
91
osutils.abspath(branch_b.bzrdir.root_transport.base))
93
# test implicit --remember after resolving previous failure
94
uncommit.uncommit(branch=branch_b, tree=tree_b)
95
transport.delete('branch_b/c')
96
out, err = self.run_bzr('push', working_dir='branch_a')
97
path = branch_a.get_push_location()
98
self.assertEquals(out,
99
'Using saved push location: %s\n'
100
% urlutils.local_path_from_url(path))
101
self.assertEqual(err,
102
'All changes applied successfully.\n'
103
'Pushed up to revision 2.\n')
104
self.assertEqual(path,
105
branch_b.bzrdir.root_transport.base)
106
# test explicit --remember
107
self.run_bzr('push ../branch_c --remember', working_dir='branch_a')
108
self.assertEquals(branch_a.get_push_location(),
109
branch_c.bzrdir.root_transport.base)
111
def test_push_without_tree(self):
112
# bzr push from a branch that does not have a checkout should work.
113
b = self.make_branch('.')
114
out, err = self.run_bzr('push pushed-location')
115
self.assertEqual('', out)
116
self.assertEqual('Created new branch.\n', err)
117
b2 = branch.Branch.open('pushed-location')
118
self.assertEndsWith(b2.base, 'pushed-location/')
120
def test_push_new_branch_revision_count(self):
121
# bzr push of a branch with revisions to a new location
122
# should print the number of revisions equal to the length of the
124
t = self.make_branch_and_tree('tree')
125
self.build_tree(['tree/file'])
128
out, err = self.run_bzr('push -d tree pushed-to')
129
self.assertEqual('', out)
130
self.assertEqual('Created new branch.\n', err)
132
def test_push_only_pushes_history(self):
133
# Knit branches should only push the history for the current revision.
134
format = bzrdir.BzrDirMetaFormat1()
135
format.repository_format = knitrepo.RepositoryFormatKnit1()
136
shared_repo = self.make_repository('repo', format=format, shared=True)
137
shared_repo.set_make_working_trees(True)
139
def make_shared_tree(path):
140
shared_repo.bzrdir.root_transport.mkdir(path)
141
shared_repo.bzrdir.create_branch_convenience('repo/' + path)
142
return workingtree.WorkingTree.open('repo/' + path)
143
tree_a = make_shared_tree('a')
144
self.build_tree(['repo/a/file'])
146
tree_a.commit('commit a-1', rev_id='a-1')
147
f = open('repo/a/file', 'ab')
148
f.write('more stuff\n')
150
tree_a.commit('commit a-2', rev_id='a-2')
152
tree_b = make_shared_tree('b')
153
self.build_tree(['repo/b/file'])
155
tree_b.commit('commit b-1', rev_id='b-1')
157
self.assertTrue(shared_repo.has_revision('a-1'))
158
self.assertTrue(shared_repo.has_revision('a-2'))
159
self.assertTrue(shared_repo.has_revision('b-1'))
161
# Now that we have a repository with shared files, make sure
162
# that things aren't copied out by a 'push'
163
self.run_bzr('push ../../push-b', working_dir='repo/b')
164
pushed_tree = workingtree.WorkingTree.open('push-b')
165
pushed_repo = pushed_tree.branch.repository
166
self.assertFalse(pushed_repo.has_revision('a-1'))
167
self.assertFalse(pushed_repo.has_revision('a-2'))
168
self.assertTrue(pushed_repo.has_revision('b-1'))
170
def test_push_funky_id(self):
171
t = self.make_branch_and_tree('tree')
172
self.build_tree(['tree/filename'])
173
t.add('filename', 'funky-chars<>%&;"\'')
174
t.commit('commit filename')
175
self.run_bzr('push -d tree new-tree')
177
def test_push_dash_d(self):
178
t = self.make_branch_and_tree('from')
179
t.commit(allow_pointless=True,
180
message='first commit')
181
self.run_bzr('push -d from to-one')
182
self.failUnlessExists('to-one')
183
self.run_bzr('push -d %s %s'
184
% tuple(map(urlutils.local_path_to_url, ['from', 'to-two'])))
185
self.failUnlessExists('to-two')
187
def test_push_smart_non_stacked_streaming_acceptance(self):
188
self.setup_smart_server_with_call_log()
189
t = self.make_branch_and_tree('from')
190
t.commit(allow_pointless=True, message='first commit')
191
self.reset_smart_call_log()
192
self.run_bzr(['push', self.get_url('to-one')], working_dir='from')
193
# This figure represent the amount of work to perform this use case. It
194
# is entirely ok to reduce this number if a test fails due to rpc_count
195
# being too low. If rpc_count increases, more network roundtrips have
196
# become necessary for this use case. Please do not adjust this number
197
# upwards without agreement from bzr's network support maintainers.
198
self.assertLength(9, self.hpss_calls)
200
def test_push_smart_stacked_streaming_acceptance(self):
201
self.setup_smart_server_with_call_log()
202
parent = self.make_branch_and_tree('parent', format='1.9')
203
parent.commit(message='first commit')
204
local = parent.bzrdir.sprout('local').open_workingtree()
205
local.commit(message='local commit')
206
self.reset_smart_call_log()
207
self.run_bzr(['push', '--stacked', '--stacked-on', '../parent',
208
self.get_url('public')], working_dir='local')
209
# This figure represent the amount of work to perform this use case. It
210
# is entirely ok to reduce this number if a test fails due to rpc_count
211
# being too low. If rpc_count increases, more network roundtrips have
212
# become necessary for this use case. Please do not adjust this number
213
# upwards without agreement from bzr's network support maintainers.
214
self.assertLength(14, self.hpss_calls)
215
remote = branch.Branch.open('public')
216
self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
218
def create_simple_tree(self):
219
tree = self.make_branch_and_tree('tree')
220
self.build_tree(['tree/a'])
221
tree.add(['a'], ['a-id'])
222
tree.commit('one', rev_id='r1')
225
def test_push_create_prefix(self):
226
"""'bzr push --create-prefix' will create leading directories."""
227
tree = self.create_simple_tree()
229
self.run_bzr_error(['Parent directory of ../new/tree does not exist'],
232
self.run_bzr('push ../new/tree --create-prefix',
234
new_tree = workingtree.WorkingTree.open('new/tree')
235
self.assertEqual(tree.last_revision(), new_tree.last_revision())
236
self.failUnlessExists('new/tree/a')
238
def test_push_use_existing(self):
239
"""'bzr push --use-existing-dir' can push into an existing dir.
241
By default, 'bzr push' will not use an existing, non-versioned dir.
243
tree = self.create_simple_tree()
244
self.build_tree(['target/'])
246
self.run_bzr_error(['Target directory ../target already exists',
247
'Supply --use-existing-dir',
249
'push ../target', working_dir='tree')
251
self.run_bzr('push --use-existing-dir ../target',
254
new_tree = workingtree.WorkingTree.open('target')
255
self.assertEqual(tree.last_revision(), new_tree.last_revision())
256
# The push should have created target/a
257
self.failUnlessExists('target/a')
259
def test_push_onto_repo(self):
260
"""We should be able to 'bzr push' into an existing bzrdir."""
261
tree = self.create_simple_tree()
262
repo = self.make_repository('repo', shared=True)
264
self.run_bzr('push ../repo',
267
# Pushing onto an existing bzrdir will create a repository and
268
# branch as needed, but will only create a working tree if there was
270
self.assertRaises(errors.NoWorkingTree,
271
workingtree.WorkingTree.open, 'repo')
272
new_branch = branch.Branch.open('repo')
273
self.assertEqual(tree.last_revision(), new_branch.last_revision())
275
def test_push_onto_just_bzrdir(self):
276
"""We don't handle when the target is just a bzrdir.
278
Because you shouldn't be able to create *just* a bzrdir in the wild.
280
# TODO: jam 20070109 Maybe it would be better to create the repository
282
tree = self.create_simple_tree()
283
a_bzrdir = self.make_bzrdir('dir')
285
self.run_bzr_error(['At ../dir you have a valid .bzr control'],
289
def test_push_with_revisionspec(self):
290
"""We should be able to push a revision older than the tip."""
291
tree_from = self.make_branch_and_tree('from')
292
tree_from.commit("One.", rev_id="from-1")
293
tree_from.commit("Two.", rev_id="from-2")
295
self.run_bzr('push -r1 ../to', working_dir='from')
297
tree_to = workingtree.WorkingTree.open('to')
298
repo_to = tree_to.branch.repository
299
self.assertTrue(repo_to.has_revision('from-1'))
300
self.assertFalse(repo_to.has_revision('from-2'))
301
self.assertEqual(tree_to.branch.last_revision_info()[1], 'from-1')
304
['bzr: ERROR: bzr push --revision '
305
'takes exactly one revision identifier\n'],
306
'push -r0..2 ../to', working_dir='from')
308
def create_trunk_and_feature_branch(self):
310
trunk_tree = self.make_branch_and_tree('target',
312
trunk_tree.commit('mainline')
313
# and a branch from it
314
branch_tree = self.make_branch_and_tree('branch',
316
branch_tree.pull(trunk_tree.branch)
317
branch_tree.branch.set_parent(trunk_tree.branch.base)
318
# with some work on it
319
branch_tree.commit('moar work plz')
320
return trunk_tree, branch_tree
322
def assertPublished(self, branch_revid, stacked_on):
323
"""Assert that the branch 'published' has been published correctly."""
324
published_branch = branch.Branch.open('published')
325
# The published branch refers to the mainline
326
self.assertEqual(stacked_on, published_branch.get_stacked_on_url())
327
# and the branch's work was pushed
328
self.assertTrue(published_branch.repository.has_revision(branch_revid))
330
def test_push_new_branch_stacked_on(self):
331
"""Pushing a new branch with --stacked-on creates a stacked branch."""
332
trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
333
# we publish branch_tree with a reference to the mainline.
334
out, err = self.run_bzr(['push', '--stacked-on', trunk_tree.branch.base,
335
self.get_url('published')], working_dir='branch')
336
self.assertEqual('', out)
337
self.assertEqual('Created new stacked branch referring to %s.\n' %
338
trunk_tree.branch.base, err)
339
self.assertPublished(branch_tree.last_revision(),
340
trunk_tree.branch.base)
342
def test_push_new_branch_stacked_uses_parent_when_no_public_url(self):
343
"""When the parent has no public url the parent is used as-is."""
344
trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
345
# now we do a stacked push, which should determine the public location
347
out, err = self.run_bzr(['push', '--stacked',
348
self.get_url('published')], working_dir='branch')
349
self.assertEqual('', out)
350
self.assertEqual('Created new stacked branch referring to %s.\n' %
351
trunk_tree.branch.base, err)
352
self.assertPublished(branch_tree.last_revision(),
353
trunk_tree.branch.base)
355
def test_push_new_branch_stacked_uses_parent_public(self):
356
"""Pushing a new branch with --stacked creates a stacked branch."""
357
trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
358
# the trunk is published on a web server
359
self.transport_readonly_server = http_server.HttpServer
360
trunk_public = self.make_branch('public_trunk', format='1.9')
361
trunk_public.pull(trunk_tree.branch)
362
trunk_public_url = self.get_readonly_url('public_trunk')
363
trunk_tree.branch.set_public_branch(trunk_public_url)
364
# now we do a stacked push, which should determine the public location
366
out, err = self.run_bzr(['push', '--stacked',
367
self.get_url('published')], working_dir='branch')
368
self.assertEqual('', out)
369
self.assertEqual('Created new stacked branch referring to %s.\n' %
370
trunk_public_url, err)
371
self.assertPublished(branch_tree.last_revision(), trunk_public_url)
373
def test_push_new_branch_stacked_no_parent(self):
374
"""Pushing with --stacked and no parent branch errors."""
375
branch = self.make_branch_and_tree('branch', format='1.9')
376
# now we do a stacked push, which should fail as the place to refer too
377
# cannot be determined.
378
out, err = self.run_bzr_error(
379
['Could not determine branch to refer to\\.'], ['push', '--stacked',
380
self.get_url('published')], working_dir='branch')
381
self.assertEqual('', out)
382
self.assertFalse(self.get_transport('published').has('.'))
384
def test_push_notifies_default_stacking(self):
385
self.make_branch('stack_on', format='1.6')
386
self.make_bzrdir('.').get_config().set_default_stack_on('stack_on')
387
self.make_branch('from', format='1.6')
388
out, err = self.run_bzr('push -d from to')
389
self.assertContainsRe(err,
390
'Using default stacking branch stack_on at .*')
392
def test_push_stacks_with_default_stacking_if_target_is_stackable(self):
393
self.make_branch('stack_on', format='1.6')
394
self.make_bzrdir('.').get_config().set_default_stack_on('stack_on')
395
self.make_branch('from', format='pack-0.92')
396
out, err = self.run_bzr('push -d from to')
397
b = branch.Branch.open('to')
398
self.assertEqual('../stack_on', b.get_stacked_on_url())
400
def test_push_does_not_change_format_with_default_if_target_cannot(self):
401
self.make_branch('stack_on', format='pack-0.92')
402
self.make_bzrdir('.').get_config().set_default_stack_on('stack_on')
403
self.make_branch('from', format='pack-0.92')
404
out, err = self.run_bzr('push -d from to')
405
b = branch.Branch.open('to')
406
self.assertRaises(errors.UnstackableBranchFormat, b.get_stacked_on_url)
408
def test_push_doesnt_create_broken_branch(self):
409
"""Pushing a new standalone branch works even when there's a default
410
stacking policy at the destination.
412
The new branch will preserve the repo format (even if it isn't the
413
default for the branch), and will be stacked when the repo format
414
allows (which means that the branch format isn't necessarly preserved).
416
self.make_repository('repo', shared=True, format='1.6')
417
builder = self.make_branch_builder('repo/local', format='pack-0.92')
418
builder.start_series()
419
builder.build_snapshot('rev-1', None, [
420
('add', ('', 'root-id', 'directory', '')),
421
('add', ('filename', 'f-id', 'file', 'content\n'))])
422
builder.build_snapshot('rev-2', ['rev-1'], [])
423
builder.build_snapshot('rev-3', ['rev-2'],
424
[('modify', ('f-id', 'new-content\n'))])
425
builder.finish_series()
426
branch = builder.get_branch()
427
# Push rev-1 to "trunk", so that we can stack on it.
428
self.run_bzr('push -d repo/local trunk -r 1')
429
# Set a default stacking policy so that new branches will automatically
431
self.make_bzrdir('.').get_config().set_default_stack_on('trunk')
432
# Push rev-2 to a new branch "remote". It will be stacked on "trunk".
433
out, err = self.run_bzr('push -d repo/local remote -r 2')
434
self.assertContainsRe(
435
err, 'Using default stacking branch trunk at .*')
436
# Push rev-3 onto "remote". If "remote" not stacked and is missing the
437
# fulltext record for f-id @ rev-1, then this will fail.
438
out, err = self.run_bzr('push -d repo/local remote -r 3')
440
def test_push_verbose_shows_log(self):
441
tree = self.make_branch_and_tree('source')
443
out, err = self.run_bzr('push -v -d source target')
444
# initial push contains log
445
self.assertContainsRe(out, 'rev1')
447
out, err = self.run_bzr('push -v -d source target')
448
# subsequent push contains log
449
self.assertContainsRe(out, 'rev2')
450
# subsequent log is accurate
451
self.assertNotContainsRe(out, 'rev1')
454
class RedirectingMemoryTransport(memory.MemoryTransport):
456
def mkdir(self, relpath, mode=None):
457
from bzrlib.trace import mutter
458
if self._cwd == '/source/':
459
raise errors.RedirectRequested(self.abspath(relpath),
460
self.abspath('../target'),
462
elif self._cwd == '/infinite-loop/':
463
raise errors.RedirectRequested(self.abspath(relpath),
464
self.abspath('../infinite-loop'),
467
return super(RedirectingMemoryTransport, self).mkdir(
470
def _redirected_to(self, source, target):
471
# We do accept redirections
472
return transport.get_transport(target)
475
class RedirectingMemoryServer(memory.MemoryServer):
478
self._dirs = {'/': None}
481
self._scheme = 'redirecting-memory+%s:///' % id(self)
482
transport.register_transport(self._scheme, self._memory_factory)
484
def _memory_factory(self, url):
485
result = RedirectingMemoryTransport(url)
486
result._dirs = self._dirs
487
result._files = self._files
488
result._locks = self._locks
492
transport.unregister_transport(self._scheme, self._memory_factory)
495
class TestPushRedirect(tests.TestCaseWithTransport):
498
tests.TestCaseWithTransport.setUp(self)
499
self.memory_server = RedirectingMemoryServer()
500
self.memory_server.setUp()
501
self.addCleanup(self.memory_server.tearDown)
503
# Make the branch and tree that we'll be pushing.
504
t = self.make_branch_and_tree('tree')
505
self.build_tree(['tree/file'])
509
def test_push_redirects_on_mkdir(self):
510
"""If the push requires a mkdir, push respects redirect requests.
512
This is added primarily to handle lp:/ URI support, so that users can
513
push to new branches by specifying lp:/ URIs.
515
destination_url = self.memory_server.get_url() + 'source'
516
self.run_bzr(['push', '-d', 'tree', destination_url])
518
local_revision = branch.Branch.open('tree').last_revision()
519
remote_revision = branch.Branch.open(
520
self.memory_server.get_url() + 'target').last_revision()
521
self.assertEqual(remote_revision, local_revision)
523
def test_push_gracefully_handles_too_many_redirects(self):
524
"""Push fails gracefully if the mkdir generates a large number of
527
destination_url = self.memory_server.get_url() + 'infinite-loop'
528
out, err = self.run_bzr_error(
529
['Too many redirections trying to make %s\\.\n'
530
% re.escape(destination_url)],
531
['push', '-d', 'tree', destination_url], retcode=3)
532
self.assertEqual('', out)