~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Robert Collins
  • Date: 2005-10-16 14:51:21 UTC
  • mto: This revision was merged to the branch mainline in revision 1459.
  • Revision ID: robertc@lifelesslap.robertcollins.net-20051016145121-01852573f8c63daa
replace __contains__ calls in stores with has_id

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
 
17
import os
 
18
from bzrlib.branch import Branch
 
19
from bzrlib.clone import copy_branch
 
20
from bzrlib.commit import commit
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
17
23
from bzrlib.selftest import TestCaseInTempDir
18
 
 
19
 
 
20
 
class TestAppendRevisions(TestCaseInTempDir):
21
 
    """Test appending more than one revision"""
 
24
from bzrlib.trace import mutter
 
25
import bzrlib.transactions as transactions
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
27
 
 
28
class TestBranch(TestCaseInTempDir):
 
29
 
22
30
    def test_append_revisions(self):
23
 
        from bzrlib.branch import Branch
24
 
        br = Branch(".", init=True)
 
31
        """Test appending more than one revision"""
 
32
        br = Branch.initialize(".")
25
33
        br.append_revision("rev1")
26
34
        self.assertEquals(br.revision_history(), ["rev1",])
27
35
        br.append_revision("rev2", "rev3")
28
36
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
29
37
 
30
 
 
 
38
    def test_fetch_revisions(self):
 
39
        """Test fetch-revision operation."""
 
40
        from bzrlib.fetch import Fetcher
 
41
        os.mkdir('b1')
 
42
        os.mkdir('b2')
 
43
        b1 = Branch.initialize('b1')
 
44
        b2 = Branch.initialize('b2')
 
45
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
46
        b1.add(['foo'], ['foo-id'])
 
47
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
48
 
 
49
        mutter('start fetch')
 
50
        f = Fetcher(from_branch=b1, to_branch=b2)
 
51
        eq = self.assertEquals
 
52
        eq(f.count_copied, 1)
 
53
        eq(f.last_revision, 'revision-1')
 
54
 
 
55
        rev = b2.get_revision('revision-1')
 
56
        tree = b2.revision_tree('revision-1')
 
57
        eq(tree.get_file_text('foo-id'), 'hello')
 
58
 
 
59
    def test_push_stores(self):
 
60
        """Copy the stores from one branch to another"""
 
61
        os.mkdir('a')
 
62
        br_a = Branch.initialize("a")
 
63
        file('a/b', 'wb').write('b')
 
64
        br_a.add('b')
 
65
        commit(br_a, "silly commit")
 
66
 
 
67
        os.mkdir('b')
 
68
        br_b = Branch.initialize("b")
 
69
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
70
                          br_a.revision_history()[0])
 
71
        br_a.push_stores(br_b)
 
72
        rev = br_b.get_revision(br_a.revision_history()[0])
 
73
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
74
        for file_id in tree:
 
75
            if tree.inventory[file_id].kind == "file":
 
76
                tree.get_file(file_id).read()
 
77
        return br_a, br_b
 
78
 
 
79
    def test_copy_branch(self):
 
80
        """Copy the stores from one branch to another"""
 
81
        br_a, br_b = self.test_push_stores()
 
82
        commit(br_b, "silly commit")
 
83
        os.mkdir('c')
 
84
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
85
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
86
        ## # basis branches currently disabled for weave format
 
87
        ## self.assertFalse(br_b.last_revision() in br_c.revision_history())
 
88
        ## br_c.get_revision(br_b.last_revision())
 
89
 
 
90
    def test_copy_partial(self):
 
91
        """Copy only part of the history of a branch."""
 
92
        self.build_tree(['a/', 'a/one'])
 
93
        br_a = Branch.initialize('a')
 
94
        br_a.add(['one'])
 
95
        br_a.commit('commit one', rev_id='u@d-1')
 
96
        self.build_tree(['a/two'])
 
97
        br_a.add(['two'])
 
98
        br_a.commit('commit two', rev_id='u@d-2')
 
99
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
100
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
101
        self.assertTrue(os.path.exists('b/one'))
 
102
        self.assertFalse(os.path.exists('b/two'))
 
103
        
 
104
 
 
105
    def test_record_initial_ghost_merge(self):
 
106
        """A pending merge with no revision present is still a merge."""
 
107
        branch = Branch.initialize('.')
 
108
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
109
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
110
        rev = branch.get_revision(branch.last_revision())
 
111
        self.assertEqual(len(rev.parent_ids), 1)
 
112
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
113
        self.assertEqual(len(rev.parent_sha1s), 0)
 
114
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
115
 
 
116
# TODO 20051003 RBC:
 
117
# compare the gpg-to-sign info for a commit with a ghost and 
 
118
#     an identical tree without a ghost
 
119
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
120
        
 
121
    def test_pending_merges(self):
 
122
        """Tracking pending-merged revisions."""
 
123
        b = Branch.initialize('.')
 
124
 
 
125
        self.assertEquals(b.pending_merges(), [])
 
126
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
127
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
128
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
129
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
130
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
131
        self.assertEquals(b.pending_merges(),
 
132
                          ['foo@azkhazan-123123-abcabc',
 
133
                           'wibble@fofof--20050401--1928390812'])
 
134
        b.commit("commit from base with two merges")
 
135
        rev = b.get_revision(b.revision_history()[0])
 
136
        self.assertEquals(len(rev.parent_ids), 2)
 
137
        self.assertEquals(rev.parent_ids[0],
 
138
                          'foo@azkhazan-123123-abcabc')
 
139
        self.assertEquals(rev.parent_ids[1],
 
140
                           'wibble@fofof--20050401--1928390812')
 
141
        # list should be cleared when we do a commit
 
142
        self.assertEquals(b.pending_merges(), [])
 
143
 
 
144
 
 
145
class TestRemote(TestCaseWithWebserver):
 
146
 
 
147
    def test_open_containing(self):
 
148
        self.assertRaises(NotBranchError, Branch.open_containing,
 
149
                          self.get_remote_url(''))
 
150
        self.assertRaises(NotBranchError, Branch.open_containing,
 
151
                          self.get_remote_url('g/p/q'))
 
152
        b = Branch.initialize('.')
 
153
        Branch.open_containing(self.get_remote_url(''))
 
154
        Branch.open_containing(self.get_remote_url('g/p/q'))
 
155
        
31
156
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
32
157
#         >>> from bzrlib.commit import commit
33
158
#         >>> bzrlib.trace.silent = True
46
171
#         Added 0 revisions.
47
172
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
48
173
#         True
 
174
 
 
175
class InstrumentedTransaction(object):
 
176
 
 
177
    def finish(self):
 
178
        self.calls.append('finish')
 
179
 
 
180
    def __init__(self):
 
181
        self.calls = []
 
182
 
 
183
 
 
184
class TestBranchTransaction(TestCaseInTempDir):
 
185
 
 
186
    def setUp(self):
 
187
        super(TestBranchTransaction, self).setUp()
 
188
        self.branch = Branch.initialize('.')
 
189
        
 
190
    def test_default_get_transaction(self):
 
191
        """branch.get_transaction on a new branch should give a PassThrough."""
 
192
        self.failUnless(isinstance(self.branch.get_transaction(),
 
193
                                   transactions.PassThroughTransaction))
 
194
 
 
195
    def test__set_new_transaction(self):
 
196
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
197
 
 
198
    def test__set_over_existing_transaction_raises(self):
 
199
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
200
        self.assertRaises(errors.LockError,
 
201
                          self.branch._set_transaction,
 
202
                          transactions.ReadOnlyTransaction())
 
203
 
 
204
    def test_finish_no_transaction_raises(self):
 
205
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
206
 
 
207
    def test_finish_readonly_transaction_works(self):
 
208
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
209
        self.branch._finish_transaction()
 
210
        self.assertEqual(None, self.branch._transaction)
 
211
 
 
212
    def test_unlock_calls_finish(self):
 
213
        self.branch.lock_read()
 
214
        transaction = InstrumentedTransaction()
 
215
        self.branch._transaction = transaction
 
216
        self.branch.unlock()
 
217
        self.assertEqual(['finish'], transaction.calls)
 
218
 
 
219
    def test_lock_read_acquires_ro_transaction(self):
 
220
        self.branch.lock_read()
 
221
        self.failUnless(isinstance(self.branch.get_transaction(),
 
222
                                   transactions.ReadOnlyTransaction))
 
223
        self.branch.unlock()
 
224
        
 
225
    def test_lock_write_acquires_passthrough_transaction(self):
 
226
        self.branch.lock_write()
 
227
        # cannot use get_transaction as its magic
 
228
        self.failUnless(isinstance(self.branch._transaction,
 
229
                                   transactions.PassThroughTransaction))
 
230
        self.branch.unlock()