~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_repository/test_fetch.py

  • Committer: Aaron Bentley
  • Date: 2008-10-16 21:27:10 UTC
  • mfrom: (0.15.26 unshelve)
  • mto: (0.16.74 shelf-ui)
  • mto: This revision was merged to the branch mainline in revision 3820.
  • Revision ID: aaron@aaronbentley.com-20081016212710-h9av3nhk76dvmsv5
Merge with unshelve

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for fetch between repositories of the same type."""
 
18
 
 
19
from bzrlib import (
 
20
    bzrdir,
 
21
    errors,
 
22
    gpg,
 
23
    repository,
 
24
    )
 
25
from bzrlib.tests import TestSkipped
 
26
from bzrlib.tests.per_repository import TestCaseWithRepository
 
27
from bzrlib.transport import get_transport
 
28
 
 
29
 
 
30
class TestFetchSameRepository(TestCaseWithRepository):
 
31
 
 
32
    def test_fetch(self):
 
33
        # smoke test fetch to ensure that the convenience function works.
 
34
        # it is defined as a convenience function with the underlying 
 
35
        # functionality provided by an InterRepository
 
36
        tree_a = self.make_branch_and_tree('a')
 
37
        self.build_tree(['a/foo'])
 
38
        tree_a.add('foo', 'file1')
 
39
        tree_a.commit('rev1', rev_id='rev1')
 
40
        # fetch with a default limit (grab everything)
 
41
        repo = self.make_repository('b')
 
42
        if (tree_a.branch.repository.supports_rich_root() and not
 
43
            repo.supports_rich_root()):
 
44
            raise TestSkipped('Cannot fetch from model2 to model1')
 
45
        repo.fetch(tree_a.branch.repository,
 
46
                   revision_id=None)
 
47
                   ## pb=bzrlib.progress.DummyProgress())
 
48
 
 
49
    def test_fetch_knit3(self):
 
50
        # create a repository of the sort we are testing.
 
51
        tree_a = self.make_branch_and_tree('a')
 
52
        self.build_tree(['a/foo'])
 
53
        tree_a.add('foo', 'file1')
 
54
        tree_a.commit('rev1', rev_id='rev1')
 
55
        # create a knit-3 based format to fetch into
 
56
        f = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
57
        try:
 
58
            format = tree_a.branch.repository._format
 
59
            format.check_conversion_target(f.repository_format)
 
60
            # if we cannot convert data to knit3, skip the test.
 
61
        except errors.BadConversionTarget, e:
 
62
            raise TestSkipped(str(e))
 
63
        self.get_transport().mkdir('b')
 
64
        b_bzrdir = f.initialize(self.get_url('b'))
 
65
        knit3_repo = b_bzrdir.create_repository()
 
66
        # fetch with a default limit (grab everything)
 
67
        knit3_repo.fetch(tree_a.branch.repository, revision_id=None)
 
68
        # Reopen to avoid any in-memory caching - ensure its reading from
 
69
        # disk.
 
70
        knit3_repo = b_bzrdir.open_repository()
 
71
        rev1_tree = knit3_repo.revision_tree('rev1')
 
72
        rev1_tree.lock_read()
 
73
        try:
 
74
            lines = rev1_tree.get_file_lines(rev1_tree.get_root_id())
 
75
        finally:
 
76
            rev1_tree.unlock()
 
77
        self.assertEqual([], lines)
 
78
        b_branch = b_bzrdir.create_branch()
 
79
        b_branch.pull(tree_a.branch)
 
80
        try:
 
81
            tree_b = b_bzrdir.create_workingtree()
 
82
        except errors.NotLocalUrl:
 
83
            raise TestSkipped("cannot make working tree with transport %r"
 
84
                              % b_bzrdir.transport)
 
85
        tree_b.commit('no change', rev_id='rev2')
 
86
        rev2_tree = knit3_repo.revision_tree('rev2')
 
87
        self.assertEqual('rev1', rev2_tree.inventory.root.revision)
 
88
 
 
89
    def test_fetch_all_from_self(self):
 
90
        tree = self.make_branch_and_tree('.')
 
91
        rev_id = tree.commit('one')
 
92
        # This needs to be a new copy of the repository, if this changes, the
 
93
        # test needs to be rewritten
 
94
        repo = tree.branch.repository.bzrdir.open_repository()
 
95
        # This fetch should be a no-op see bug #158333
 
96
        tree.branch.repository.fetch(repo, None)
 
97
 
 
98
    def test_fetch_from_self(self):
 
99
        tree = self.make_branch_and_tree('.')
 
100
        rev_id = tree.commit('one')
 
101
        repo = tree.branch.repository.bzrdir.open_repository()
 
102
        # This fetch should be a no-op see bug #158333
 
103
        tree.branch.repository.fetch(repo, rev_id)
 
104
 
 
105
    def test_fetch_missing_from_self(self):
 
106
        tree = self.make_branch_and_tree('.')
 
107
        rev_id = tree.commit('one')
 
108
        # Even though the fetch() is a NO-OP it should assert the revision id
 
109
        # is present
 
110
        repo = tree.branch.repository.bzrdir.open_repository()
 
111
        self.assertRaises(errors.NoSuchRevision, tree.branch.repository.fetch,
 
112
                          repo, 'no-such-revision')
 
113
 
 
114
    def makeARepoWithSignatures(self):
 
115
        wt = self.make_branch_and_tree('a-repo-with-sigs')
 
116
        wt.commit('rev1', allow_pointless=True, rev_id='rev1')
 
117
        repo = wt.branch.repository
 
118
        repo.lock_write()
 
119
        repo.start_write_group()
 
120
        repo.sign_revision('rev1', gpg.LoopbackGPGStrategy(None))
 
121
        repo.commit_write_group()
 
122
        repo.unlock()
 
123
        return repo
 
124
 
 
125
    def test_fetch_copies_signatures(self):
 
126
        source_repo = self.makeARepoWithSignatures()
 
127
        target_repo = self.make_repository('target')
 
128
        target_repo.fetch(source_repo, revision_id=None)
 
129
        self.assertEqual(
 
130
            source_repo.get_signature_text('rev1'),
 
131
            target_repo.get_signature_text('rev1'))
 
132
 
 
133
    def make_repository_with_one_revision(self):
 
134
        wt = self.make_branch_and_tree('source')
 
135
        wt.commit('rev1', allow_pointless=True, rev_id='rev1')
 
136
        return wt.branch.repository
 
137
 
 
138
    def test_fetch_revision_already_exists(self):
 
139
        # Make a repository with one revision.
 
140
        source_repo = self.make_repository_with_one_revision()
 
141
        # Fetch that revision into a second repository.
 
142
        target_repo = self.make_repository('target')
 
143
        target_repo.fetch(source_repo, revision_id='rev1')
 
144
        # Now fetch again; there will be nothing to do.  This should work
 
145
        # without causing any errors.
 
146
        target_repo.fetch(source_repo, revision_id='rev1')
 
147
 
 
148
    def test_fetch_all_same_revisions_twice(self):
 
149
        # Blind-fetching all the same revisions twice should succeed and be a
 
150
        # no-op the second time.
 
151
        repo = self.make_repository('repo')
 
152
        tree = self.make_branch_and_tree('tree')
 
153
        revision_id = tree.commit('test')
 
154
        repo.fetch(tree.branch.repository)
 
155
        repo.fetch(tree.branch.repository)