14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from StringIO import StringIO
17
from cStringIO import StringIO
19
19
from bzrlib.builtins import merge
20
20
from bzrlib.bzrdir import BzrDir
21
21
from bzrlib.bundle.apply_bundle import install_bundle, merge_bundle
22
from bzrlib.bundle.read_bundle import BundleTree, BundleReader
23
from bzrlib.bundle.serializer import write_bundle
22
from bzrlib.bundle.bundle_data import BundleTree
23
from bzrlib.bundle.serializer import write_bundle, read_bundle
24
24
from bzrlib.diff import internal_diff
25
from bzrlib.errors import BzrError, TestamentMismatch, NotABundle
25
from bzrlib.errors import BzrError, TestamentMismatch, NotABundle, BadBundle
26
26
from bzrlib.merge import Merge3Merger
27
27
from bzrlib.osutils import has_symlinks, sha_file
28
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
28
from bzrlib.tests import (TestCaseInTempDir, TestCaseWithTransport,
29
TestCase, TestSkipped)
29
30
from bzrlib.transform import TreeTransform
30
31
from bzrlib.workingtree import WorkingTree
294
295
self.assertEqual(self.sorted_ids(btree), ['a', 'b', 'd', 'e'])
297
class CSetTester(TestCaseInTempDir):
299
def get_valid_bundle(self, base_rev_id, rev_id, checkout_dir=None,
301
"""Create a bundle from base_rev_id -> rev_id in built-in branch.
302
Make sure that the text generated is valid, and that it
303
can be applied against the base, and generate the same information.
305
:return: The in-memory bundle
307
from cStringIO import StringIO
298
class BundleTester(TestCaseInTempDir):
300
def create_bundle_text(self, base_rev_id, rev_id):
309
301
bundle_txt = StringIO()
310
302
rev_ids = write_bundle(self.b1.repository, rev_id, base_rev_id,
312
304
bundle_txt.seek(0)
313
305
self.assertEqual(bundle_txt.readline(),
314
'# Bazaar revision bundle v0.7\n')
306
'# Bazaar revision bundle v0.8\n')
315
307
self.assertEqual(bundle_txt.readline(), '#\n')
317
309
rev = self.b1.repository.get_revision(rev_id)
321
313
open(',,bundle', 'wb').write(bundle_txt.getvalue())
322
314
bundle_txt.seek(0)
315
return bundle_txt, rev_ids
317
def get_valid_bundle(self, base_rev_id, rev_id, checkout_dir=None):
318
"""Create a bundle from base_rev_id -> rev_id in built-in branch.
319
Make sure that the text generated is valid, and that it
320
can be applied against the base, and generate the same information.
322
:return: The in-memory bundle
324
bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
323
326
# This should also validate the generated bundle
324
bundle = BundleReader(bundle_txt)
327
bundle = read_bundle(bundle_txt)
325
328
repository = self.b1.repository
326
for bundle_rev in bundle.info.real_revisions:
329
for bundle_rev in bundle.real_revisions:
327
330
# These really should have already been checked when we read the
328
331
# bundle, since it computes the sha1 hash for the revision, which
329
332
# only will match if everything is okay, but lets be explicit about
337
340
self.assertEqual(len(branch_rev.parent_ids),
338
341
len(bundle_rev.parent_ids))
339
342
self.assertEqual(rev_ids,
340
[r.revision_id for r in bundle.info.real_revisions])
343
[r.revision_id for r in bundle.real_revisions])
341
344
self.valid_apply_bundle(base_rev_id, bundle,
342
345
checkout_dir=checkout_dir)
350
353
:return: The in-memory bundle
352
from cStringIO import StringIO
354
bundle_txt = StringIO()
355
rev_ids = write_bundle(self.b1.repository, rev_id, base_rev_id,
358
open(',,bundle', 'wb').write(bundle_txt.getvalue())
355
bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
360
356
new_text = bundle_txt.getvalue().replace('executable:no',
361
357
'executable:yes')
362
358
bundle_txt = StringIO(new_text)
363
bundle = BundleReader(bundle_txt)
359
bundle = read_bundle(bundle_txt)
364
360
self.valid_apply_bundle(base_rev_id, bundle)
367
363
def test_non_bundle(self):
368
self.assertRaises(NotABundle, BundleReader, StringIO('#!/bin/sh\n'))
364
self.assertRaises(NotABundle, read_bundle, StringIO('#!/bin/sh\n'))
366
def test_malformed(self):
367
self.assertRaises(BadBundle, read_bundle,
368
StringIO('# Bazaar revision bundle v'))
370
def test_crlf_bundle(self):
372
read_bundle(StringIO('# Bazaar revision bundle v0.7\r\n'))
374
# It is currently permitted for bundles with crlf line endings to
375
# make read_bundle raise a BadBundle, but this should be fixed.
376
# Anything else, especially NotABundle, is an error.
370
379
def get_checkout(self, rev_id, checkout_dir=None):
371
380
"""Get a new tree, with the specified revision in it.
386
395
assert isinstance(s.getvalue(), str), (
387
396
"Bundle isn't a bytestring:\n %s..." % repr(s.getvalue())[:40])
388
install_bundle(tree.branch.repository, BundleReader(s))
397
install_bundle(tree.branch.repository, read_bundle(s))
389
398
for ancestor in ancestors:
390
399
old = self.b1.repository.revision_tree(ancestor)
391
400
new = tree.branch.repository.revision_tree(ancestor)
407
def valid_apply_bundle(self, base_rev_id, reader, checkout_dir=None):
416
def valid_apply_bundle(self, base_rev_id, info, checkout_dir=None):
408
417
"""Get the base revision, apply the changes, and make
409
418
sure everything matches the builtin branch.
411
420
to_tree = self.get_checkout(base_rev_id, checkout_dir=checkout_dir)
412
421
repository = to_tree.branch.repository
413
422
self.assertIs(repository.has_revision(base_rev_id), True)
415
423
for rev in info.real_revisions:
416
424
self.assert_(not repository.has_revision(rev.revision_id),
417
425
'Revision {%s} present before applying bundle'
418
426
% rev.revision_id)
419
merge_bundle(reader, to_tree, True, Merge3Merger, False, False)
427
merge_bundle(info, to_tree, True, Merge3Merger, False, False)
421
429
for rev in info.real_revisions:
422
430
self.assert_(repository.has_revision(rev.revision_id),
454
462
# to_tree.get_file(fileid).read())
456
464
def test_bundle(self):
461
465
self.tree1 = BzrDir.create_standalone_workingtree('b1')
462
466
self.b1 = self.tree1.branch
464
open(pjoin('b1/one'), 'wb').write('one\n')
468
open('b1/one', 'wb').write('one\n')
465
469
self.tree1.add('one')
466
470
self.tree1.commit('add one', rev_id='a@cset-0-1')
468
472
bundle = self.get_valid_bundle(None, 'a@cset-0-1')
469
bundle = self.get_valid_bundle(None, 'a@cset-0-1',
470
message='With a specialized message')
473
# FIXME: The current write_bundle api no longer supports
474
# setting a custom summary message
475
# We should re-introduce the ability, and update
476
# the tests to make sure it works.
477
# bundle = self.get_valid_bundle(None, 'a@cset-0-1',
478
# message='With a specialized message')
472
480
# Make sure we can handle files with spaces, tabs, other
473
481
# bogus characters
664
672
bundle = self.get_valid_bundle('a@lmod-0-2a', 'a@lmod-0-4')
666
674
def test_hide_history(self):
670
675
self.tree1 = BzrDir.create_standalone_workingtree('b1')
671
676
self.b1 = self.tree1.branch
673
open(pjoin('b1/one'), 'wb').write('one\n')
678
open('b1/one', 'wb').write('one\n')
674
679
self.tree1.add('one')
675
680
self.tree1.commit('add file', rev_id='a@cset-0-1')
676
open(pjoin('b1/one'), 'wb').write('two\n')
681
open('b1/one', 'wb').write('two\n')
677
682
self.tree1.commit('modify', rev_id='a@cset-0-2')
678
open(pjoin('b1/one'), 'wb').write('three\n')
683
open('b1/one', 'wb').write('three\n')
679
684
self.tree1.commit('modify', rev_id='a@cset-0-3')
680
685
bundle_file = StringIO()
681
686
rev_ids = write_bundle(self.tree1.branch.repository, 'a@cset-0-3',
683
688
self.assertNotContainsRe(bundle_file.getvalue(), 'two')
684
689
self.assertContainsRe(bundle_file.getvalue(), 'one')
685
690
self.assertContainsRe(bundle_file.getvalue(), 'three')
693
class MungedBundleTester(TestCaseWithTransport):
695
def build_test_bundle(self):
696
wt = self.make_branch_and_tree('b1')
698
self.build_tree(['b1/one'])
700
wt.commit('add one', rev_id='a@cset-0-1')
701
self.build_tree(['b1/two'])
703
wt.commit('add two', rev_id='a@cset-0-2')
705
bundle_txt = StringIO()
706
rev_ids = write_bundle(wt.branch.repository, 'a@cset-0-2',
707
'a@cset-0-1', bundle_txt)
708
self.assertEqual(['a@cset-0-2'], rev_ids)
709
bundle_txt.seek(0, 0)
712
def check_valid(self, bundle):
713
"""Check that after whatever munging, the final object is valid."""
714
self.assertEqual(['a@cset-0-2'],
715
[r.revision_id for r in bundle.real_revisions])
717
def test_extra_whitespace(self):
718
bundle_txt = self.build_test_bundle()
720
# Seek to the end of the file
721
# Adding one extra newline used to give us
722
# TypeError: float() argument must be a string or a number
723
bundle_txt.seek(0, 2)
724
bundle_txt.write('\n')
727
bundle = read_bundle(bundle_txt)
728
self.check_valid(bundle)
730
def test_extra_whitespace_2(self):
731
bundle_txt = self.build_test_bundle()
733
# Seek to the end of the file
734
# Adding two extra newlines used to give us
735
# MalformedPatches: The first line of all patches should be ...
736
bundle_txt.seek(0, 2)
737
bundle_txt.write('\n\n')
740
bundle = read_bundle(bundle_txt)
741
self.check_valid(bundle)
743
def test_missing_trailing_whitespace(self):
744
bundle_txt = self.build_test_bundle()
746
# Remove a trailing newline, it shouldn't kill the parser
747
raw = bundle_txt.getvalue()
748
# The contents of the bundle don't have to be this, but this
749
# test is concerned with the exact case where the serializer
750
# creates a blank line at the end, and fails if that
752
self.assertEqual('\n\n', raw[-2:])
753
bundle_text = StringIO(raw[:-1])
755
bundle = read_bundle(bundle_txt)
756
self.check_valid(bundle)