1
# Copyright (C) 2005, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Black-box tests for bzr push."""
28
from bzrlib.branch import Branch
29
from bzrlib.bzrdir import BzrDirMetaFormat1
30
from bzrlib.osutils import abspath
31
from bzrlib.repofmt.knitrepo import RepositoryFormatKnit1
32
from bzrlib.smart import client, server
33
from bzrlib.tests.blackbox import ExternalBase
34
from bzrlib.tests.http_server import HttpServer
35
from bzrlib.transport.memory import MemoryServer, MemoryTransport
36
from bzrlib.uncommit import uncommit
37
from bzrlib.urlutils import local_path_from_url
38
from bzrlib.workingtree import WorkingTree
41
class TestPush(ExternalBase):
43
def test_push_remember(self):
44
"""Push changes from one branch to another and test push location."""
45
transport = self.get_transport()
46
tree_a = self.make_branch_and_tree('branch_a')
47
branch_a = tree_a.branch
48
self.build_tree(['branch_a/a'])
50
tree_a.commit('commit a')
51
tree_b = branch_a.bzrdir.sprout('branch_b').open_workingtree()
52
branch_b = tree_b.branch
53
tree_c = branch_a.bzrdir.sprout('branch_c').open_workingtree()
54
branch_c = tree_c.branch
55
self.build_tree(['branch_a/b'])
57
tree_a.commit('commit b')
58
self.build_tree(['branch_b/c'])
60
tree_b.commit('commit c')
61
# initial push location must be empty
62
self.assertEqual(None, branch_b.get_push_location())
64
# test push for failure without push location set
66
out = self.run_bzr('push', retcode=3)
67
self.assertEquals(out,
68
('','bzr: ERROR: No push location known or specified.\n'))
70
# test not remembered if cannot actually push
71
self.run_bzr('push ../path/which/doesnt/exist', retcode=3)
72
out = self.run_bzr('push', retcode=3)
74
('', 'bzr: ERROR: No push location known or specified.\n'),
77
# test implicit --remember when no push location set, push fails
78
out = self.run_bzr('push ../branch_b', retcode=3)
79
self.assertEquals(out,
80
('','bzr: ERROR: These branches have diverged. '
81
'Try using "merge" and then "push".\n'))
82
self.assertEquals(abspath(branch_a.get_push_location()),
83
abspath(branch_b.bzrdir.root_transport.base))
85
# test implicit --remember after resolving previous failure
86
uncommit(branch=branch_b, tree=tree_b)
87
transport.delete('branch_b/c')
88
out, err = self.run_bzr('push')
89
path = branch_a.get_push_location()
90
self.assertEquals(out,
91
'Using saved push location: %s\n'
92
'Pushed up to revision 2.\n'
93
% local_path_from_url(path))
95
'All changes applied successfully.\n')
96
self.assertEqual(path,
97
branch_b.bzrdir.root_transport.base)
98
# test explicit --remember
99
self.run_bzr('push ../branch_c --remember')
100
self.assertEquals(branch_a.get_push_location(),
101
branch_c.bzrdir.root_transport.base)
103
def test_push_without_tree(self):
104
# bzr push from a branch that does not have a checkout should work.
105
b = self.make_branch('.')
106
out, err = self.run_bzr('push pushed-location')
107
self.assertEqual('', out)
108
self.assertEqual('Created new branch.\n', err)
109
b2 = Branch.open('pushed-location')
110
self.assertEndsWith(b2.base, 'pushed-location/')
112
def test_push_new_branch_revision_count(self):
113
# bzr push of a branch with revisions to a new location
114
# should print the number of revisions equal to the length of the
116
t = self.make_branch_and_tree('tree')
117
self.build_tree(['tree/file'])
121
out, err = self.run_bzr('push pushed-to')
123
self.assertEqual('', out)
124
self.assertEqual('Created new branch.\n', err)
126
def test_push_only_pushes_history(self):
127
# Knit branches should only push the history for the current revision.
128
format = BzrDirMetaFormat1()
129
format.repository_format = RepositoryFormatKnit1()
130
shared_repo = self.make_repository('repo', format=format, shared=True)
131
shared_repo.set_make_working_trees(True)
133
def make_shared_tree(path):
134
shared_repo.bzrdir.root_transport.mkdir(path)
135
shared_repo.bzrdir.create_branch_convenience('repo/' + path)
136
return WorkingTree.open('repo/' + path)
137
tree_a = make_shared_tree('a')
138
self.build_tree(['repo/a/file'])
140
tree_a.commit('commit a-1', rev_id='a-1')
141
f = open('repo/a/file', 'ab')
142
f.write('more stuff\n')
144
tree_a.commit('commit a-2', rev_id='a-2')
146
tree_b = make_shared_tree('b')
147
self.build_tree(['repo/b/file'])
149
tree_b.commit('commit b-1', rev_id='b-1')
151
self.assertTrue(shared_repo.has_revision('a-1'))
152
self.assertTrue(shared_repo.has_revision('a-2'))
153
self.assertTrue(shared_repo.has_revision('b-1'))
155
# Now that we have a repository with shared files, make sure
156
# that things aren't copied out by a 'push'
158
self.run_bzr('push ../../push-b')
159
pushed_tree = WorkingTree.open('../../push-b')
160
pushed_repo = pushed_tree.branch.repository
161
self.assertFalse(pushed_repo.has_revision('a-1'))
162
self.assertFalse(pushed_repo.has_revision('a-2'))
163
self.assertTrue(pushed_repo.has_revision('b-1'))
165
def test_push_funky_id(self):
166
t = self.make_branch_and_tree('tree')
168
self.build_tree(['filename'])
169
t.add('filename', 'funky-chars<>%&;"\'')
170
t.commit('commit filename')
171
self.run_bzr('push ../new-tree')
173
def test_push_dash_d(self):
174
t = self.make_branch_and_tree('from')
175
t.commit(allow_pointless=True,
176
message='first commit')
177
self.run_bzr('push -d from to-one')
178
self.failUnlessExists('to-one')
179
self.run_bzr('push -d %s %s'
180
% tuple(map(urlutils.local_path_to_url, ['from', 'to-two'])))
181
self.failUnlessExists('to-two')
183
def _reset_smart_call_log(self):
186
def _setup_smart_call_log(self):
187
self.transport_server = server.SmartTCPServer_for_testing
189
def capture_hpss_call(params):
190
self.hpss_calls.append(params)
191
client._SmartClient.hooks.install_named_hook(
192
'call', capture_hpss_call, None)
194
def test_push_smart_non_stacked_streaming_acceptance(self):
195
self._setup_smart_call_log()
196
t = self.make_branch_and_tree('from')
197
t.commit(allow_pointless=True, message='first commit')
198
self._reset_smart_call_log()
199
self.run_bzr(['push', self.get_url('to-one')], working_dir='from')
200
rpc_count = len(self.hpss_calls)
201
# This figure represent the amount of work to perform this use case. It
202
# is entirely ok to reduce this number if a test fails due to rpc_count
203
# being too low. If rpc_count increases, more network roundtrips have
204
# become necessary for this use case. Please do not adjust this number
205
# upwards without agreement from bzr's network support maintainers.
206
self.assertEqual(107, rpc_count)
208
def test_push_smart_stacked_streaming_acceptance(self):
209
self._setup_smart_call_log()
210
parent = self.make_branch_and_tree('parent', format='1.9')
211
parent.commit(message='first commit')
212
local = parent.bzrdir.sprout('local').open_workingtree()
213
local.commit(message='local commit')
214
self._reset_smart_call_log()
215
self.run_bzr(['push', '--stacked', '--stacked-on', '../parent',
216
self.get_url('public')], working_dir='local')
217
rpc_count = len(self.hpss_calls)
218
# This figure represent the amount of work to perform this use case. It
219
# is entirely ok to reduce this number if a test fails due to rpc_count
220
# being too low. If rpc_count increases, more network roundtrips have
221
# become necessary for this use case. Please do not adjust this number
222
# upwards without agreement from bzr's network support maintainers.
223
self.assertEqual(132, rpc_count)
224
remote = Branch.open('public')
225
self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
227
def create_simple_tree(self):
228
tree = self.make_branch_and_tree('tree')
229
self.build_tree(['tree/a'])
230
tree.add(['a'], ['a-id'])
231
tree.commit('one', rev_id='r1')
234
def test_push_create_prefix(self):
235
"""'bzr push --create-prefix' will create leading directories."""
236
tree = self.create_simple_tree()
238
self.run_bzr_error(['Parent directory of ../new/tree does not exist'],
241
self.run_bzr('push ../new/tree --create-prefix',
243
new_tree = WorkingTree.open('new/tree')
244
self.assertEqual(tree.last_revision(), new_tree.last_revision())
245
self.failUnlessExists('new/tree/a')
247
def test_push_use_existing(self):
248
"""'bzr push --use-existing-dir' can push into an existing dir.
250
By default, 'bzr push' will not use an existing, non-versioned dir.
252
tree = self.create_simple_tree()
253
self.build_tree(['target/'])
255
self.run_bzr_error(['Target directory ../target already exists',
256
'Supply --use-existing-dir',
258
'push ../target', working_dir='tree')
260
self.run_bzr('push --use-existing-dir ../target',
263
new_tree = WorkingTree.open('target')
264
self.assertEqual(tree.last_revision(), new_tree.last_revision())
265
# The push should have created target/a
266
self.failUnlessExists('target/a')
268
def test_push_onto_repo(self):
269
"""We should be able to 'bzr push' into an existing bzrdir."""
270
tree = self.create_simple_tree()
271
repo = self.make_repository('repo', shared=True)
273
self.run_bzr('push ../repo',
276
# Pushing onto an existing bzrdir will create a repository and
277
# branch as needed, but will only create a working tree if there was
279
self.assertRaises(errors.NoWorkingTree, WorkingTree.open, 'repo')
280
new_branch = Branch.open('repo')
281
self.assertEqual(tree.last_revision(), new_branch.last_revision())
283
def test_push_onto_just_bzrdir(self):
284
"""We don't handle when the target is just a bzrdir.
286
Because you shouldn't be able to create *just* a bzrdir in the wild.
288
# TODO: jam 20070109 Maybe it would be better to create the repository
290
tree = self.create_simple_tree()
291
a_bzrdir = self.make_bzrdir('dir')
293
self.run_bzr_error(['At ../dir you have a valid .bzr control'],
297
def test_push_with_revisionspec(self):
298
"""We should be able to push a revision older than the tip."""
299
tree_from = self.make_branch_and_tree('from')
300
tree_from.commit("One.", rev_id="from-1")
301
tree_from.commit("Two.", rev_id="from-2")
303
self.run_bzr('push -r1 ../to', working_dir='from')
305
tree_to = WorkingTree.open('to')
306
repo_to = tree_to.branch.repository
307
self.assertTrue(repo_to.has_revision('from-1'))
308
self.assertFalse(repo_to.has_revision('from-2'))
309
self.assertEqual(tree_to.branch.last_revision_info()[1], 'from-1')
312
"bzr: ERROR: bzr push --revision takes one value.\n",
313
'push -r0..2 ../to', working_dir='from')
315
def create_trunk_and_feature_branch(self):
317
trunk_tree = self.make_branch_and_tree('target',
318
format='development')
319
trunk_tree.commit('mainline')
320
# and a branch from it
321
branch_tree = self.make_branch_and_tree('branch',
322
format='development')
323
branch_tree.pull(trunk_tree.branch)
324
branch_tree.branch.set_parent(trunk_tree.branch.base)
325
# with some work on it
326
branch_tree.commit('moar work plz')
327
return trunk_tree, branch_tree
329
def assertPublished(self, branch_revid, stacked_on):
330
"""Assert that the branch 'published' has been published correctly."""
331
published_branch = Branch.open('published')
332
# The published branch refers to the mainline
333
self.assertEqual(stacked_on, published_branch.get_stacked_on_url())
334
# and the branch's work was pushed
335
self.assertTrue(published_branch.repository.has_revision(branch_revid))
337
def test_push_new_branch_stacked_on(self):
338
"""Pushing a new branch with --stacked-on creates a stacked branch."""
339
trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
340
# we publish branch_tree with a reference to the mainline.
341
out, err = self.run_bzr(['push', '--stacked-on', trunk_tree.branch.base,
342
self.get_url('published')], working_dir='branch')
343
self.assertEqual('', out)
344
self.assertEqual('Created new stacked branch referring to %s.\n' %
345
trunk_tree.branch.base, err)
346
self.assertPublished(branch_tree.last_revision(),
347
trunk_tree.branch.base)
349
def test_push_new_branch_stacked_uses_parent_when_no_public_url(self):
350
"""When the parent has no public url the parent is used as-is."""
351
trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
352
# now we do a stacked push, which should determine the public location
354
out, err = self.run_bzr(['push', '--stacked',
355
self.get_url('published')], working_dir='branch')
356
self.assertEqual('', out)
357
self.assertEqual('Created new stacked branch referring to %s.\n' %
358
trunk_tree.branch.base, err)
359
self.assertPublished(branch_tree.last_revision(), trunk_tree.branch.base)
361
def test_push_new_branch_stacked_uses_parent_public(self):
362
"""Pushing a new branch with --stacked creates a stacked branch."""
363
trunk_tree, branch_tree = self.create_trunk_and_feature_branch()
364
# the trunk is published on a web server
365
self.transport_readonly_server = HttpServer
366
trunk_public = self.make_branch('public_trunk', format='development')
367
trunk_public.pull(trunk_tree.branch)
368
trunk_public_url = self.get_readonly_url('public_trunk')
369
trunk_tree.branch.set_public_branch(trunk_public_url)
370
# now we do a stacked push, which should determine the public location
372
out, err = self.run_bzr(['push', '--stacked',
373
self.get_url('published')], working_dir='branch')
374
self.assertEqual('', out)
375
self.assertEqual('Created new stacked branch referring to %s.\n' %
376
trunk_public_url, err)
377
self.assertPublished(branch_tree.last_revision(), trunk_public_url)
379
def test_push_new_branch_stacked_no_parent(self):
380
"""Pushing with --stacked and no parent branch errors."""
381
branch = self.make_branch_and_tree('branch', format='development')
382
# now we do a stacked push, which should fail as the place to refer too
383
# cannot be determined.
384
out, err = self.run_bzr_error(
385
['Could not determine branch to refer to\\.'], ['push', '--stacked',
386
self.get_url('published')], working_dir='branch')
387
self.assertEqual('', out)
388
self.assertFalse(self.get_transport('published').has('.'))
390
def test_push_notifies_default_stacking(self):
391
self.make_branch('stack_on', format='1.6')
392
self.make_bzrdir('.').get_config().set_default_stack_on('stack_on')
393
self.make_branch('from', format='1.6')
394
out, err = self.run_bzr('push -d from to')
395
self.assertContainsRe(err,
396
'Using default stacking branch stack_on at .*')
398
def test_push_doesnt_create_broken_branch(self):
399
"""Pushing a new standalone branch works even when there's a default
400
stacking policy at the destination.
402
The new branch will preserve the repo format (even if it isn't the
403
default for the branch), and will be stacked when the repo format
404
allows (which means that the branch format isn't necessarly preserved).
406
self.make_repository('repo', shared=True, format='1.6')
407
builder = self.make_branch_builder('repo/local', format='pack-0.92')
408
builder.start_series()
409
builder.build_snapshot('rev-1', None, [
410
('add', ('', 'root-id', 'directory', '')),
411
('add', ('filename', 'f-id', 'file', 'content\n'))])
412
builder.build_snapshot('rev-2', ['rev-1'], [])
413
builder.build_snapshot('rev-3', ['rev-2'],
414
[('modify', ('f-id', 'new-content\n'))])
415
builder.finish_series()
416
branch = builder.get_branch()
417
# Push rev-1 to "trunk", so that we can stack on it.
418
self.run_bzr('push -d repo/local trunk -r 1')
419
# Set a default stacking policy so that new branches will automatically
421
self.make_bzrdir('.').get_config().set_default_stack_on('trunk')
422
# Push rev-2 to a new branch "remote". It will be stacked on "trunk".
423
out, err = self.run_bzr('push -d repo/local remote -r 2')
424
self.assertContainsRe(
425
err, 'Using default stacking branch trunk at .*')
426
# Push rev-3 onto "remote". If "remote" not stacked and is missing the
427
# fulltext record for f-id @ rev-1, then this will fail.
428
out, err = self.run_bzr('push -d repo/local remote -r 3')
430
def test_push_verbose_shows_log(self):
431
tree = self.make_branch_and_tree('source')
433
out, err = self.run_bzr('push -v -d source target')
434
# initial push contains log
435
self.assertContainsRe(out, 'rev1')
437
out, err = self.run_bzr('push -v -d source target')
438
# subsequent push contains log
439
self.assertContainsRe(out, 'rev2')
440
# subsequent log is accurate
441
self.assertNotContainsRe(out, 'rev1')
444
class RedirectingMemoryTransport(MemoryTransport):
446
def mkdir(self, relpath, mode=None):
447
from bzrlib.trace import mutter
448
mutter('cwd: %r, rel: %r, abs: %r' % (self._cwd, relpath, abspath))
449
if self._cwd == '/source/':
450
raise errors.RedirectRequested(self.abspath(relpath),
451
self.abspath('../target'),
453
elif self._cwd == '/infinite-loop/':
454
raise errors.RedirectRequested(self.abspath(relpath),
455
self.abspath('../infinite-loop'),
458
return super(RedirectingMemoryTransport, self).mkdir(
461
def _redirected_to(self, source, target):
462
# We do accept redirections
463
return transport.get_transport(target)
466
class RedirectingMemoryServer(MemoryServer):
469
self._dirs = {'/': None}
472
self._scheme = 'redirecting-memory+%s:///' % id(self)
473
transport.register_transport(self._scheme, self._memory_factory)
475
def _memory_factory(self, url):
476
result = RedirectingMemoryTransport(url)
477
result._dirs = self._dirs
478
result._files = self._files
479
result._locks = self._locks
483
transport.unregister_transport(self._scheme, self._memory_factory)
486
class TestPushRedirect(ExternalBase):
489
ExternalBase.setUp(self)
490
self.memory_server = RedirectingMemoryServer()
491
self.memory_server.setUp()
492
self.addCleanup(self.memory_server.tearDown)
494
# Make the branch and tree that we'll be pushing.
495
t = self.make_branch_and_tree('tree')
496
self.build_tree(['tree/file'])
500
def test_push_redirects_on_mkdir(self):
501
"""If the push requires a mkdir, push respects redirect requests.
503
This is added primarily to handle lp:/ URI support, so that users can
504
push to new branches by specifying lp:/ URIs.
506
destination_url = self.memory_server.get_url() + 'source'
507
self.run_bzr(['push', '-d', 'tree', destination_url])
509
local_revision = Branch.open('tree').last_revision()
510
remote_revision = Branch.open(
511
self.memory_server.get_url() + 'target').last_revision()
512
self.assertEqual(remote_revision, local_revision)
514
def test_push_gracefully_handles_too_many_redirects(self):
515
"""Push fails gracefully if the mkdir generates a large number of
518
destination_url = self.memory_server.get_url() + 'infinite-loop'
519
out, err = self.run_bzr_error(
520
['Too many redirections trying to make %s\\.\n'
521
% re.escape(destination_url)],
522
['push', '-d', 'tree', destination_url], retcode=3)
523
self.assertEqual('', out)