1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for fetch between repositories of the same type."""
25
from bzrlib.tests import TestSkipped
26
from bzrlib.tests.per_repository import TestCaseWithRepository
27
from bzrlib.transport import get_transport
30
class TestFetchSameRepository(TestCaseWithRepository):
33
# smoke test fetch to ensure that the convenience function works.
34
# it is defined as a convenience function with the underlying
35
# functionality provided by an InterRepository
36
tree_a = self.make_branch_and_tree('a')
37
self.build_tree(['a/foo'])
38
tree_a.add('foo', 'file1')
39
tree_a.commit('rev1', rev_id='rev1')
40
# fetch with a default limit (grab everything)
41
repo = self.make_repository('b')
42
if (tree_a.branch.repository.supports_rich_root() and not
43
repo.supports_rich_root()):
44
raise TestSkipped('Cannot fetch from model2 to model1')
45
repo.fetch(tree_a.branch.repository,
47
## pb=bzrlib.progress.DummyProgress())
49
def test_fetch_knit3(self):
50
# create a repository of the sort we are testing.
51
tree_a = self.make_branch_and_tree('a')
52
self.build_tree(['a/foo'])
53
tree_a.add('foo', 'file1')
54
tree_a.commit('rev1', rev_id='rev1')
55
# create a knit-3 based format to fetch into
56
f = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
58
format = tree_a.branch.repository._format
59
format.check_conversion_target(f.repository_format)
60
# if we cannot convert data to knit3, skip the test.
61
except errors.BadConversionTarget, e:
62
raise TestSkipped(str(e))
63
self.get_transport().mkdir('b')
64
b_bzrdir = f.initialize(self.get_url('b'))
65
knit3_repo = b_bzrdir.create_repository()
66
# fetch with a default limit (grab everything)
67
knit3_repo.fetch(tree_a.branch.repository, revision_id=None)
68
# Reopen to avoid any in-memory caching - ensure its reading from
70
knit3_repo = b_bzrdir.open_repository()
71
rev1_tree = knit3_repo.revision_tree('rev1')
74
lines = rev1_tree.get_file_lines(rev1_tree.get_root_id())
77
self.assertEqual([], lines)
78
b_branch = b_bzrdir.create_branch()
79
b_branch.pull(tree_a.branch)
81
tree_b = b_bzrdir.create_workingtree()
82
except errors.NotLocalUrl:
83
raise TestSkipped("cannot make working tree with transport %r"
85
tree_b.commit('no change', rev_id='rev2')
86
rev2_tree = knit3_repo.revision_tree('rev2')
87
self.assertEqual('rev1', rev2_tree.inventory.root.revision)
89
def test_fetch_all_from_self(self):
90
tree = self.make_branch_and_tree('.')
91
rev_id = tree.commit('one')
92
# This needs to be a new copy of the repository, if this changes, the
93
# test needs to be rewritten
94
repo = tree.branch.repository.bzrdir.open_repository()
95
# This fetch should be a no-op see bug #158333
96
tree.branch.repository.fetch(repo, None)
98
def test_fetch_from_self(self):
99
tree = self.make_branch_and_tree('.')
100
rev_id = tree.commit('one')
101
repo = tree.branch.repository.bzrdir.open_repository()
102
# This fetch should be a no-op see bug #158333
103
tree.branch.repository.fetch(repo, rev_id)
105
def test_fetch_missing_from_self(self):
106
tree = self.make_branch_and_tree('.')
107
rev_id = tree.commit('one')
108
# Even though the fetch() is a NO-OP it should assert the revision id
110
repo = tree.branch.repository.bzrdir.open_repository()
111
self.assertRaises(errors.NoSuchRevision, tree.branch.repository.fetch,
112
repo, 'no-such-revision')
114
def makeARepoWithSignatures(self):
115
wt = self.make_branch_and_tree('a-repo-with-sigs')
116
wt.commit('rev1', allow_pointless=True, rev_id='rev1')
117
repo = wt.branch.repository
119
repo.start_write_group()
120
repo.sign_revision('rev1', gpg.LoopbackGPGStrategy(None))
121
repo.commit_write_group()
125
def test_fetch_copies_signatures(self):
126
source_repo = self.makeARepoWithSignatures()
127
target_repo = self.make_repository('target')
128
target_repo.fetch(source_repo, revision_id=None)
130
source_repo.get_signature_text('rev1'),
131
target_repo.get_signature_text('rev1'))
133
def make_repository_with_one_revision(self):
134
wt = self.make_branch_and_tree('source')
135
wt.commit('rev1', allow_pointless=True, rev_id='rev1')
136
return wt.branch.repository
138
def test_fetch_revision_already_exists(self):
139
# Make a repository with one revision.
140
source_repo = self.make_repository_with_one_revision()
141
# Fetch that revision into a second repository.
142
target_repo = self.make_repository('target')
143
target_repo.fetch(source_repo, revision_id='rev1')
144
# Now fetch again; there will be nothing to do. This should work
145
# without causing any errors.
146
target_repo.fetch(source_repo, revision_id='rev1')
148
def test_fetch_all_same_revisions_twice(self):
149
# Blind-fetching all the same revisions twice should succeed and be a
150
# no-op the second time.
151
repo = self.make_repository('repo')
152
tree = self.make_branch_and_tree('tree')
153
revision_id = tree.commit('test')
154
repo.fetch(tree.branch.repository)
155
repo.fetch(tree.branch.repository)