~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

MergeĀ fromĀ jam-storage.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
from bzrlib.selftest import InTempDir
18
 
 
19
 
 
20
 
 
21
 
class TestAppendRevisions(InTempDir):
22
 
    """Test appending more than one revision"""
23
 
    def runTest(self):
24
 
        from bzrlib.branch import Branch
25
 
        br = Branch(".", init=True)
 
17
import os
 
18
import sys
 
19
 
 
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.osutils import getcwd
 
26
from bzrlib.tests import TestCase, TestCaseInTempDir
 
27
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
28
from bzrlib.trace import mutter
 
29
import bzrlib.transactions as transactions
 
30
from bzrlib.revision import NULL_REVISION
 
31
 
 
32
# TODO: Make a branch using basis branch, and check that it 
 
33
# doesn't request any files that could have been avoided, by 
 
34
# hooking into the Transport.
 
35
 
 
36
class TestBranch(TestCaseInTempDir):
 
37
 
 
38
    def test_append_revisions(self):
 
39
        """Test appending more than one revision"""
 
40
        br = Branch.initialize(u".")
26
41
        br.append_revision("rev1")
27
42
        self.assertEquals(br.revision_history(), ["rev1",])
28
43
        br.append_revision("rev2", "rev3")
29
44
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
30
 
        
31
 
 
32
 
 
33
 
TEST_CLASSES = [
34
 
    TestAppendRevisions,
35
 
    ]
 
45
 
 
46
    def test_fetch_revisions(self):
 
47
        """Test fetch-revision operation."""
 
48
        from bzrlib.fetch import Fetcher
 
49
        os.mkdir('b1')
 
50
        os.mkdir('b2')
 
51
        b1 = Branch.initialize('b1')
 
52
        b2 = Branch.initialize('b2')
 
53
        file('b1/foo', 'w').write('hello')
 
54
        b1.working_tree().add(['foo'], ['foo-id'])
 
55
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
56
 
 
57
        mutter('start fetch')
 
58
        f = Fetcher(from_branch=b1, to_branch=b2)
 
59
        eq = self.assertEquals
 
60
        eq(f.count_copied, 1)
 
61
        eq(f.last_revision, 'revision-1')
 
62
 
 
63
        rev = b2.repository.get_revision('revision-1')
 
64
        tree = b2.repository.revision_tree('revision-1')
 
65
        eq(tree.get_file_text('foo-id'), 'hello')
 
66
 
 
67
    def test_revision_tree(self):
 
68
        b1 = Branch.initialize(u'.')
 
69
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
70
        tree = b1.repository.revision_tree('revision-1')
 
71
        tree = b1.repository.revision_tree(None)
 
72
        self.assertEqual(len(tree.list_files()), 0)
 
73
        tree = b1.repository.revision_tree(NULL_REVISION)
 
74
        self.assertEqual(len(tree.list_files()), 0)
 
75
 
 
76
    def get_unbalanced_branch_pair(self):
 
77
        """Return two branches, a and b, with one file in a."""
 
78
        os.mkdir('a')
 
79
        br_a = Branch.initialize("a")
 
80
        file('a/b', 'wb').write('b')
 
81
        br_a.working_tree().add('b')
 
82
        commit(br_a, "silly commit", rev_id='A')
 
83
        os.mkdir('b')
 
84
        br_b = Branch.initialize("b")
 
85
        return br_a, br_b
 
86
 
 
87
    def get_balanced_branch_pair(self):
 
88
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
89
        br_a, br_b = self.get_unbalanced_branch_pair()
 
90
        br_a.push_stores(br_b)
 
91
        return br_a, br_b
 
92
 
 
93
    def test_push_stores(self):
 
94
        """Copy the stores from one branch to another"""
 
95
        br_a, br_b = self.get_unbalanced_branch_pair()
 
96
        # ensure the revision is missing.
 
97
        self.assertRaises(NoSuchRevision, br_b.repository.get_revision, 
 
98
                          br_a.revision_history()[0])
 
99
        br_a.push_stores(br_b)
 
100
        # check that b now has all the data from a's first commit.
 
101
        rev = br_b.repository.get_revision(br_a.revision_history()[0])
 
102
        tree = br_b.repository.revision_tree(br_a.revision_history()[0])
 
103
        for file_id in tree:
 
104
            if tree.inventory[file_id].kind == "file":
 
105
                tree.get_file(file_id).read()
 
106
        return br_a, br_b
 
107
 
 
108
    def test_clone_branch(self):
 
109
        """Copy the stores from one branch to another"""
 
110
        br_a, br_b = self.get_balanced_branch_pair()
 
111
        commit(br_b, "silly commit")
 
112
        os.mkdir('c')
 
113
        br_c = br_a.clone('c', basis_branch=br_b)
 
114
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
115
 
 
116
    def test_clone_partial(self):
 
117
        """Copy only part of the history of a branch."""
 
118
        self.build_tree(['a/', 'a/one'])
 
119
        br_a = Branch.initialize('a')
 
120
        br_a.working_tree().add(['one'])
 
121
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
 
122
        self.build_tree(['a/two'])
 
123
        br_a.working_tree().add(['two'])
 
124
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
125
        br_b = br_a.clone('b', revision='u@d-1')
 
126
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
127
        self.assertTrue(os.path.exists('b/one'))
 
128
        self.assertFalse(os.path.exists('b/two'))
 
129
        
 
130
    def test_record_initial_ghost_merge(self):
 
131
        """A pending merge with no revision present is still a merge."""
 
132
        branch = Branch.initialize(u'.')
 
133
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
134
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
135
        rev = branch.repository.get_revision(branch.last_revision())
 
136
        self.assertEqual(len(rev.parent_ids), 1)
 
137
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
138
        self.assertEqual(len(rev.parent_sha1s), 0)
 
139
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
140
 
 
141
    def test_bad_revision(self):
 
142
        branch = Branch.initialize('.')
 
143
        self.assertRaises(errors.InvalidRevisionId, 
 
144
                          branch.repository.get_revision, None)
 
145
 
 
146
# TODO 20051003 RBC:
 
147
# compare the gpg-to-sign info for a commit with a ghost and 
 
148
#     an identical tree without a ghost
 
149
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
150
        
 
151
    def test_pending_merges(self):
 
152
        """Tracking pending-merged revisions."""
 
153
        b = Branch.initialize(u'.')
 
154
        wt = b.working_tree()
 
155
        self.assertEquals(wt.pending_merges(), [])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
159
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
160
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
161
        self.assertEquals(wt.pending_merges(),
 
162
                          ['foo@azkhazan-123123-abcabc',
 
163
                           'wibble@fofof--20050401--1928390812'])
 
164
        b.working_tree().commit("commit from base with two merges")
 
165
        rev = b.repository.get_revision(b.revision_history()[0])
 
166
        self.assertEquals(len(rev.parent_ids), 2)
 
167
        self.assertEquals(rev.parent_ids[0],
 
168
                          'foo@azkhazan-123123-abcabc')
 
169
        self.assertEquals(rev.parent_ids[1],
 
170
                           'wibble@fofof--20050401--1928390812')
 
171
        # list should be cleared when we do a commit
 
172
        self.assertEquals(wt.pending_merges(), [])
 
173
 
 
174
    def test_sign_existing_revision(self):
 
175
        branch = Branch.initialize(u'.')
 
176
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
 
177
        from bzrlib.testament import Testament
 
178
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
 
179
        branch.repository.sign_revision('A', strategy)
 
180
        self.assertEqual(Testament.from_revision(branch.repository, 
 
181
                         'A').as_short_text(),
 
182
                         branch.repository.revision_store.get('A', 
 
183
                         'sig').read())
 
184
 
 
185
    def test_store_signature(self):
 
186
        branch = Branch.initialize('.')
 
187
        branch.repository.store_revision_signature(
 
188
            bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
189
        self.assertEqual('FOO', 
 
190
                         branch.repository.revision_store.get('A', 
 
191
                         'sig').read())
 
192
 
 
193
    def test__escape(self):
 
194
        branch = Branch.initialize('.')
 
195
        self.assertEqual('.bzr/%25', branch.control_files._escape('%'))
 
196
        
 
197
    def test__escape_empty(self):
 
198
        branch = Branch.initialize('.')
 
199
        self.assertEqual('', branch.control_files._escape(''))
 
200
 
 
201
    def test_nicks(self):
 
202
        """Branch nicknames"""
 
203
        os.mkdir('bzr.dev')
 
204
        branch = Branch.initialize('bzr.dev')
 
205
        self.assertEqual(branch.nick, 'bzr.dev')
 
206
        os.rename('bzr.dev', 'bzr.ab')
 
207
        branch = Branch.open('bzr.ab')
 
208
        self.assertEqual(branch.nick, 'bzr.ab')
 
209
        branch.nick = "Aaron's branch"
 
210
        branch.nick = "Aaron's branch"
 
211
        self.failUnlessExists(branch.control_files.controlfilename("branch.conf"))
 
212
        self.assertEqual(branch.nick, "Aaron's branch")
 
213
        os.rename('bzr.ab', 'integration')
 
214
        branch = Branch.open('integration')
 
215
        self.assertEqual(branch.nick, "Aaron's branch")
 
216
        branch.nick = u"\u1234"
 
217
        self.assertEqual(branch.nick, u"\u1234")
 
218
 
 
219
    def test_commit_nicks(self):
 
220
        """Nicknames are committed to the revision"""
 
221
        os.mkdir('bzr.dev')
 
222
        branch = Branch.initialize('bzr.dev')
 
223
        branch.nick = "My happy branch"
 
224
        branch.working_tree().commit('My commit respect da nick.')
 
225
        committed = branch.repository.get_revision(branch.last_revision())
 
226
        self.assertEqual(committed.properties["branch-nick"], 
 
227
                         "My happy branch")
 
228
 
 
229
 
 
230
class TestRemote(TestCaseWithWebserver):
 
231
 
 
232
    def test_open_containing(self):
 
233
        self.assertRaises(NotBranchError, Branch.open_containing,
 
234
                          self.get_remote_url(''))
 
235
        self.assertRaises(NotBranchError, Branch.open_containing,
 
236
                          self.get_remote_url('g/p/q'))
 
237
        b = Branch.initialize(u'.')
 
238
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
239
        self.assertEqual('', relpath)
 
240
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
241
        self.assertEqual('g/p/q', relpath)
 
242
        
 
243
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
244
#         >>> from bzrlib.commit import commit
 
245
#         >>> bzrlib.trace.silent = True
 
246
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
247
#         >>> br1.working_tree().add('foo')
 
248
#         >>> br1.working_tree().add('bar')
 
249
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
250
#         >>> br2 = ScratchBranch()
 
251
#         >>> br2.update_revisions(br1)
 
252
#         Added 2 texts.
 
253
#         Added 1 inventories.
 
254
#         Added 1 revisions.
 
255
#         >>> br2.revision_history()
 
256
#         [u'REVISION-ID-1']
 
257
#         >>> br2.update_revisions(br1)
 
258
#         Added 0 revisions.
 
259
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
260
#         True
 
261
 
 
262
class InstrumentedTransaction(object):
 
263
 
 
264
    def finish(self):
 
265
        self.calls.append('finish')
 
266
 
 
267
    def __init__(self):
 
268
        self.calls = []
 
269
 
 
270
 
 
271
class TestDecorator(object):
 
272
 
 
273
    def __init__(self):
 
274
        self._calls = []
 
275
 
 
276
    def lock_read(self):
 
277
        self._calls.append('lr')
 
278
 
 
279
    def lock_write(self):
 
280
        self._calls.append('lw')
 
281
 
 
282
    def unlock(self):
 
283
        self._calls.append('ul')
 
284
 
 
285
    @needs_read_lock
 
286
    def do_with_read(self):
 
287
        return 1
 
288
 
 
289
    @needs_read_lock
 
290
    def except_with_read(self):
 
291
        raise RuntimeError
 
292
 
 
293
    @needs_write_lock
 
294
    def do_with_write(self):
 
295
        return 2
 
296
 
 
297
    @needs_write_lock
 
298
    def except_with_write(self):
 
299
        raise RuntimeError
 
300
 
 
301
 
 
302
class TestDecorators(TestCase):
 
303
 
 
304
    def test_needs_read_lock(self):
 
305
        branch = TestDecorator()
 
306
        self.assertEqual(1, branch.do_with_read())
 
307
        self.assertEqual(['lr', 'ul'], branch._calls)
 
308
 
 
309
    def test_excepts_in_read_lock(self):
 
310
        branch = TestDecorator()
 
311
        self.assertRaises(RuntimeError, branch.except_with_read)
 
312
        self.assertEqual(['lr', 'ul'], branch._calls)
 
313
 
 
314
    def test_needs_write_lock(self):
 
315
        branch = TestDecorator()
 
316
        self.assertEqual(2, branch.do_with_write())
 
317
        self.assertEqual(['lw', 'ul'], branch._calls)
 
318
 
 
319
    def test_excepts_in_write_lock(self):
 
320
        branch = TestDecorator()
 
321
        self.assertRaises(RuntimeError, branch.except_with_write)
 
322
        self.assertEqual(['lw', 'ul'], branch._calls)
 
323
 
 
324
 
 
325
class TestBranchTransaction(TestCaseInTempDir):
 
326
 
 
327
    def setUp(self):
 
328
        super(TestBranchTransaction, self).setUp()
 
329
        self.branch = Branch.initialize(u'.')
 
330
        
 
331
    def test_default_get_transaction(self):
 
332
        """branch.get_transaction on a new branch should give a PassThrough."""
 
333
        self.failUnless(isinstance(self.branch.get_transaction(),
 
334
                                   transactions.PassThroughTransaction))
 
335
 
 
336
    def test__set_new_transaction(self):
 
337
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
338
 
 
339
    def test__set_over_existing_transaction_raises(self):
 
340
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
341
        self.assertRaises(errors.LockError,
 
342
                          self.branch._set_transaction,
 
343
                          transactions.ReadOnlyTransaction())
 
344
 
 
345
    def test_finish_no_transaction_raises(self):
 
346
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
347
 
 
348
    def test_finish_readonly_transaction_works(self):
 
349
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
350
        self.branch._finish_transaction()
 
351
        self.assertEqual(None, self.branch.control_files._transaction)
 
352
 
 
353
    def test_unlock_calls_finish(self):
 
354
        self.branch.lock_read()
 
355
        transaction = InstrumentedTransaction()
 
356
        self.branch.control_files._transaction = transaction
 
357
        self.branch.unlock()
 
358
        self.assertEqual(['finish'], transaction.calls)
 
359
 
 
360
    def test_lock_read_acquires_ro_transaction(self):
 
361
        self.branch.lock_read()
 
362
        self.failUnless(isinstance(self.branch.get_transaction(),
 
363
                                   transactions.ReadOnlyTransaction))
 
364
        self.branch.unlock()
 
365
        
 
366
    def test_lock_write_acquires_passthrough_transaction(self):
 
367
        self.branch.lock_write()
 
368
        # cannot use get_transaction as its magic
 
369
        self.failUnless(isinstance(self.branch.control_files._transaction,
 
370
                                   transactions.PassThroughTransaction))
 
371
        self.branch.unlock()
 
372
 
 
373
 
 
374
class TestBranchPushLocations(TestCaseInTempDir):
 
375
 
 
376
    def setUp(self):
 
377
        super(TestBranchPushLocations, self).setUp()
 
378
        self.branch = Branch.initialize(u'.')
 
379
        
 
380
    def test_get_push_location_unset(self):
 
381
        self.assertEqual(None, self.branch.get_push_location())
 
382
 
 
383
    def test_get_push_location_exact(self):
 
384
        from bzrlib.config import (branches_config_filename,
 
385
                                   ensure_config_dir_exists)
 
386
        ensure_config_dir_exists()
 
387
        fn = branches_config_filename()
 
388
        print >> open(fn, 'wt'), ("[%s]\n"
 
389
                                  "push_location=foo" %
 
390
                                  getcwd())
 
391
        self.assertEqual("foo", self.branch.get_push_location())
 
392
 
 
393
    def test_set_push_location(self):
 
394
        from bzrlib.config import (branches_config_filename,
 
395
                                   ensure_config_dir_exists)
 
396
        ensure_config_dir_exists()
 
397
        fn = branches_config_filename()
 
398
        self.branch.set_push_location('foo')
 
399
        self.assertFileEqual("[%s]\n"
 
400
                             "push_location = foo" % getcwd(),
 
401
                             fn)
 
402
 
 
403
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
404
    # recursive section - that is, it appends the branch name.