14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from StringIO import StringIO
17
from cStringIO import StringIO
19
19
from bzrlib.builtins import merge
20
20
from bzrlib.bzrdir import BzrDir
21
21
from bzrlib.bundle.apply_bundle import install_bundle, merge_bundle
22
from bzrlib.bundle.read_bundle import BundleTree, BundleReader
23
from bzrlib.bundle.serializer import write_bundle
22
from bzrlib.bundle.bundle_data import BundleTree
23
from bzrlib.bundle.serializer import write_bundle, read_bundle
24
24
from bzrlib.diff import internal_diff
25
25
from bzrlib.errors import BzrError, TestamentMismatch, NotABundle
26
26
from bzrlib.merge import Merge3Merger
27
27
from bzrlib.osutils import has_symlinks, sha_file
28
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
28
from bzrlib.tests import (TestCaseInTempDir, TestCaseWithTransport,
29
TestCase, TestSkipped)
29
30
from bzrlib.transform import TreeTransform
30
31
from bzrlib.workingtree import WorkingTree
294
295
self.assertEqual(self.sorted_ids(btree), ['a', 'b', 'd', 'e'])
297
class CSetTester(TestCaseInTempDir):
299
def get_valid_bundle(self, base_rev_id, rev_id, checkout_dir=None,
301
"""Create a bundle from base_rev_id -> rev_id in built-in branch.
302
Make sure that the text generated is valid, and that it
303
can be applied against the base, and generate the same information.
305
:return: The in-memory bundle
307
from cStringIO import StringIO
298
class BundleTester(TestCaseInTempDir):
300
def create_bundle_text(self, base_rev_id, rev_id):
309
301
bundle_txt = StringIO()
310
302
rev_ids = write_bundle(self.b1.repository, rev_id, base_rev_id,
312
304
bundle_txt.seek(0)
313
305
self.assertEqual(bundle_txt.readline(),
314
'# Bazaar revision bundle v0.7\n')
306
'# Bazaar revision bundle v0.8\n')
315
307
self.assertEqual(bundle_txt.readline(), '#\n')
317
309
rev = self.b1.repository.get_revision(rev_id)
321
313
open(',,bundle', 'wb').write(bundle_txt.getvalue())
322
314
bundle_txt.seek(0)
315
return bundle_txt, rev_ids
317
def get_valid_bundle(self, base_rev_id, rev_id, checkout_dir=None):
318
"""Create a bundle from base_rev_id -> rev_id in built-in branch.
319
Make sure that the text generated is valid, and that it
320
can be applied against the base, and generate the same information.
322
:return: The in-memory bundle
324
bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
323
326
# This should also validate the generated bundle
324
bundle = BundleReader(bundle_txt)
327
bundle = read_bundle(bundle_txt)
325
328
repository = self.b1.repository
326
for bundle_rev in bundle.info.real_revisions:
329
for bundle_rev in bundle.real_revisions:
327
330
# These really should have already been checked when we read the
328
331
# bundle, since it computes the sha1 hash for the revision, which
329
332
# only will match if everything is okay, but lets be explicit about
337
340
self.assertEqual(len(branch_rev.parent_ids),
338
341
len(bundle_rev.parent_ids))
339
342
self.assertEqual(rev_ids,
340
[r.revision_id for r in bundle.info.real_revisions])
343
[r.revision_id for r in bundle.real_revisions])
341
344
self.valid_apply_bundle(base_rev_id, bundle,
342
345
checkout_dir=checkout_dir)
350
353
:return: The in-memory bundle
352
from cStringIO import StringIO
354
bundle_txt = StringIO()
355
rev_ids = write_bundle(self.b1.repository, rev_id, base_rev_id,
358
open(',,bundle', 'wb').write(bundle_txt.getvalue())
355
bundle_txt, rev_ids = self.create_bundle_text(base_rev_id, rev_id)
360
356
new_text = bundle_txt.getvalue().replace('executable:no',
361
357
'executable:yes')
362
358
bundle_txt = StringIO(new_text)
363
bundle = BundleReader(bundle_txt)
359
bundle = read_bundle(bundle_txt)
364
360
self.valid_apply_bundle(base_rev_id, bundle)
367
363
def test_non_bundle(self):
368
self.assertRaises(NotABundle, BundleReader, StringIO('#!/bin/sh\n'))
364
self.assertRaises(NotABundle, read_bundle, StringIO('#!/bin/sh\n'))
370
366
def get_checkout(self, rev_id, checkout_dir=None):
371
367
"""Get a new tree, with the specified revision in it.
386
382
assert isinstance(s.getvalue(), str), (
387
383
"Bundle isn't a bytestring:\n %s..." % repr(s.getvalue())[:40])
388
install_bundle(tree.branch.repository, BundleReader(s))
384
install_bundle(tree.branch.repository, read_bundle(s))
389
385
for ancestor in ancestors:
390
386
old = self.b1.repository.revision_tree(ancestor)
391
387
new = tree.branch.repository.revision_tree(ancestor)
407
def valid_apply_bundle(self, base_rev_id, reader, checkout_dir=None):
403
def valid_apply_bundle(self, base_rev_id, info, checkout_dir=None):
408
404
"""Get the base revision, apply the changes, and make
409
405
sure everything matches the builtin branch.
411
407
to_tree = self.get_checkout(base_rev_id, checkout_dir=checkout_dir)
412
408
repository = to_tree.branch.repository
413
409
self.assertIs(repository.has_revision(base_rev_id), True)
415
410
for rev in info.real_revisions:
416
411
self.assert_(not repository.has_revision(rev.revision_id),
417
412
'Revision {%s} present before applying bundle'
418
413
% rev.revision_id)
419
merge_bundle(reader, to_tree, True, Merge3Merger, False, False)
414
merge_bundle(info, to_tree, True, Merge3Merger, False, False)
421
416
for rev in info.real_revisions:
422
417
self.assert_(repository.has_revision(rev.revision_id),
454
449
# to_tree.get_file(fileid).read())
456
451
def test_bundle(self):
461
452
self.tree1 = BzrDir.create_standalone_workingtree('b1')
462
453
self.b1 = self.tree1.branch
464
open(pjoin('b1/one'), 'wb').write('one\n')
455
open('b1/one', 'wb').write('one\n')
465
456
self.tree1.add('one')
466
457
self.tree1.commit('add one', rev_id='a@cset-0-1')
468
459
bundle = self.get_valid_bundle(None, 'a@cset-0-1')
469
bundle = self.get_valid_bundle(None, 'a@cset-0-1',
470
message='With a specialized message')
460
# FIXME: The current write_bundle api no longer supports
461
# setting a custom summary message
462
# We should re-introduce the ability, and update
463
# the tests to make sure it works.
464
# bundle = self.get_valid_bundle(None, 'a@cset-0-1',
465
# message='With a specialized message')
472
467
# Make sure we can handle files with spaces, tabs, other
473
468
# bogus characters
664
659
bundle = self.get_valid_bundle('a@lmod-0-2a', 'a@lmod-0-4')
666
661
def test_hide_history(self):
670
662
self.tree1 = BzrDir.create_standalone_workingtree('b1')
671
663
self.b1 = self.tree1.branch
673
open(pjoin('b1/one'), 'wb').write('one\n')
665
open('b1/one', 'wb').write('one\n')
674
666
self.tree1.add('one')
675
667
self.tree1.commit('add file', rev_id='a@cset-0-1')
676
open(pjoin('b1/one'), 'wb').write('two\n')
668
open('b1/one', 'wb').write('two\n')
677
669
self.tree1.commit('modify', rev_id='a@cset-0-2')
678
open(pjoin('b1/one'), 'wb').write('three\n')
670
open('b1/one', 'wb').write('three\n')
679
671
self.tree1.commit('modify', rev_id='a@cset-0-3')
680
672
bundle_file = StringIO()
681
673
rev_ids = write_bundle(self.tree1.branch.repository, 'a@cset-0-3',
683
675
self.assertNotContainsRe(bundle_file.getvalue(), 'two')
684
676
self.assertContainsRe(bundle_file.getvalue(), 'one')
685
677
self.assertContainsRe(bundle_file.getvalue(), 'three')
680
class MungedBundleTester(TestCaseWithTransport):
682
def build_test_bundle(self):
683
wt = self.make_branch_and_tree('b1')
685
self.build_tree(['b1/one'])
687
wt.commit('add one', rev_id='a@cset-0-1')
688
self.build_tree(['b1/two'])
690
wt.commit('add two', rev_id='a@cset-0-2')
692
bundle_txt = StringIO()
693
rev_ids = write_bundle(wt.branch.repository, 'a@cset-0-2',
694
'a@cset-0-1', bundle_txt)
695
self.assertEqual(['a@cset-0-2'], rev_ids)
696
bundle_txt.seek(0, 0)
699
def check_valid(self, bundle):
700
"""Check that after whatever munging, the final object is valid."""
701
self.assertEqual(['a@cset-0-2'],
702
[r.revision_id for r in bundle.real_revisions])
704
def test_extra_whitespace(self):
705
bundle_txt = self.build_test_bundle()
707
# Seek to the end of the file
708
# Adding one extra newline used to give us
709
# TypeError: float() argument must be a string or a number
710
bundle_txt.seek(0, 2)
711
bundle_txt.write('\n')
714
bundle = read_bundle(bundle_txt)
715
self.check_valid(bundle)
717
def test_extra_whitespace_2(self):
718
bundle_txt = self.build_test_bundle()
720
# Seek to the end of the file
721
# Adding two extra newlines used to give us
722
# MalformedPatches: The first line of all patches should be ...
723
bundle_txt.seek(0, 2)
724
bundle_txt.write('\n\n')
727
bundle = read_bundle(bundle_txt)
728
self.check_valid(bundle)
730
def test_missing_trailing_whitespace(self):
731
bundle_txt = self.build_test_bundle()
733
# Remove a trailing newline, it shouldn't kill the parser
734
raw = bundle_txt.getvalue()
735
# The contents of the bundle don't have to be this, but this
736
# test is concerned with the exact case where the serializer
737
# creates a blank line at the end, and fails if that
739
self.assertEqual('\n\n', raw[-2:])
740
bundle_text = StringIO(raw[:-1])
742
bundle = read_bundle(bundle_txt)
743
self.check_valid(bundle)