1
# Copyright (C) 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from bzrlib.selftest.testrevision import make_branches
22
from bzrlib.trace import mutter
23
from bzrlib.branch import Branch
24
from bzrlib.fetch import greedy_fetch
26
from bzrlib.selftest import TestCaseInTempDir
27
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
30
def has_revision(branch, revision_id):
32
branch.get_revision_xml_file(revision_id)
34
except bzrlib.errors.NoSuchRevision:
37
def fetch_steps(self, br_a, br_b, writable_a):
38
"""A foreign test method for testing fetch locally and remotely."""
41
return Branch.initialize(name)
43
assert not has_revision(br_b, br_a.revision_history()[3])
44
assert has_revision(br_b, br_a.revision_history()[2])
45
assert len(br_b.revision_history()) == 7
46
assert greedy_fetch(br_b, br_a, br_a.revision_history()[2])[0] == 0
48
# greedy_fetch is not supposed to alter the revision history
49
assert len(br_b.revision_history()) == 7
50
assert not has_revision(br_b, br_a.revision_history()[3])
52
assert len(br_b.revision_history()) == 7
53
assert greedy_fetch(br_b, br_a, br_a.revision_history()[3])[0] == 1
54
assert has_revision(br_b, br_a.revision_history()[3])
55
assert not has_revision(br_a, br_b.revision_history()[6])
56
assert has_revision(br_a, br_b.revision_history()[5])
58
# When a non-branch ancestor is missing, it should be unlisted...
59
# as its not reference from the inventory weave.
60
br_b4 = new_branch('br_4')
61
count, failures = greedy_fetch(br_b4, br_b)
62
self.assertEqual(count, 7)
63
self.assertEqual(failures, [])
65
self.assertEqual(greedy_fetch(writable_a, br_b)[0], 1)
66
assert has_revision(br_a, br_b.revision_history()[3])
67
assert has_revision(br_a, br_b.revision_history()[4])
69
br_b2 = new_branch('br_b2')
70
assert greedy_fetch(br_b2, br_b)[0] == 7
71
assert has_revision(br_b2, br_b.revision_history()[4])
72
assert has_revision(br_b2, br_a.revision_history()[2])
73
assert not has_revision(br_b2, br_a.revision_history()[3])
75
br_a2 = new_branch('br_a2')
76
assert greedy_fetch(br_a2, br_a)[0] == 9
77
assert has_revision(br_a2, br_b.revision_history()[4])
78
assert has_revision(br_a2, br_a.revision_history()[3])
79
assert has_revision(br_a2, br_a.revision_history()[2])
81
br_a3 = new_branch('br_a3')
82
assert greedy_fetch(br_a3, br_a2)[0] == 0
83
for revno in range(4):
84
assert not has_revision(br_a3, br_a.revision_history()[revno])
85
self.assertEqual(greedy_fetch(br_a3, br_a2, br_a.revision_history()[2])[0], 3)
86
fetched = greedy_fetch(br_a3, br_a2, br_a.revision_history()[3])[0]
87
assert fetched == 3, "fetched %d instead of 3" % fetched
88
# InstallFailed should be raised if the branch is missing the revision
90
self.assertRaises(bzrlib.errors.InstallFailed, greedy_fetch, br_a3,
92
# InstallFailed should be raised if the branch is missing a revision
93
# from its own revision history
94
br_a2.append_revision('a-b-c')
95
self.assertRaises(bzrlib.errors.InstallFailed, greedy_fetch, br_a3,
99
class TestFetch(TestCaseInTempDir):
101
def test_fetch(self):
102
#highest indices a: 5, b: 7
103
br_a, br_b = make_branches()
104
fetch_steps(self, br_a, br_b, br_a)
107
class TestHttpFetch(TestCaseWithWebserver):
110
super(TestHttpFetch, self).setUp()
113
def test_fetch(self):
114
#highest indices a: 5, b: 7
115
br_a, br_b = make_branches()
116
br_rem_a = Branch.open(self.get_remote_url(br_a._transport.base))
117
fetch_steps(self, br_rem_a, br_b, br_a)
119
def log(self, *args):
120
"""Capture web server log messages for introspection."""
121
super(TestHttpFetch, self).log(*args)
122
if args[0].startswith("webserver"):
123
self.weblogs.append(args[0])
125
def test_weaves_are_retrieved_once(self):
126
self.build_tree(("source/", "source/file", "target/"))
127
branch = Branch.initialize("source")
128
branch.add(["file"], ["id"])
129
branch.commit("added file")
130
print >>open("source/file", 'w'), "blah"
131
branch.commit("changed file")
132
target = Branch.initialize("target/")
133
source = Branch.open(self.get_remote_url("source/"))
134
source.weave_store.enable_cache = False
135
self.assertEqual(greedy_fetch(target, source), (2, []))
136
weave_suffix = 'weaves/id.weave HTTP/1.1" 200 -'
138
len([log for log in self.weblogs if log.endswith(weave_suffix)]))