46
47
from bzrlib.transport import get_transport
47
48
from bzrlib.transport.memory import MemoryTransport
48
49
from bzrlib.tsort import topo_sort
50
from bzrlib.tuned_gzip import GzipFile
49
51
import bzrlib.versionedfile as versionedfile
50
52
from bzrlib.weave import WeaveFile
51
53
from bzrlib.weavefile import read_weave, write_weave
56
def get_diamond_vf(f, trailing_eol=True):
57
"""Get a diamond graph to exercise deltas and merges.
59
:param trailing_eol: If True end the last line with \n.
63
'base': (('origin',),),
65
'right': (('base',),),
66
'merged': (('left',), ('right',)),
68
# insert a diamond graph to exercise deltas and merges.
73
f.add_lines('origin', [], ['origin' + last_char])
74
f.add_lines('base', ['origin'], ['base' + last_char])
75
f.add_lines('left', ['base'], ['base\n', 'left' + last_char])
76
f.add_lines('right', ['base'],
77
['base\n', 'right' + last_char])
78
f.add_lines('merged', ['left', 'right'],
79
['base\n', 'left\n', 'right\n', 'merged' + last_char])
54
83
class VersionedFileTestMixIn(object):
55
84
"""A mixin test class for testing VersionedFiles.
93
122
entries = f.get_record_stream([], 'unordered', False)
94
123
self.assertEqual([], list(entries))
96
def get_diamond_vf(self):
97
"""Get a diamond graph to exercise deltas and merges."""
101
'base': (('origin',),),
102
'left': (('base',),),
103
'right': (('base',),),
104
'merged': (('left',), ('right',)),
106
# insert a diamond graph to exercise deltas and merges.
107
f.add_lines('origin', [], [])
108
f.add_lines('base', ['origin'], ['base\n'])
109
f.add_lines('left', ['base'], ['base\n', 'left\n'])
110
f.add_lines('right', ['base'],
111
['base\n', 'right\n'])
112
f.add_lines('merged', ['left', 'right'],
113
['base\n', 'left\n', 'right\n', 'merged\n'])
116
125
def assertValidStorageKind(self, storage_kind):
117
126
"""Assert that storage_kind is a valid storage_kind."""
118
127
self.assertSubset([storage_kind],
1352
1361
overlappedInsertExpected = ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======',
1353
1362
'xxx', '>>>>>>> ', 'bbb']
1365
class TestContentFactoryAdaption(TestCaseWithMemoryTransport):
1367
def test_select_adaptor(self):
1368
"""Test that selecting an adaptor works."""
1369
# self.assertEqual(versionedfile.
1372
return make_file_knit('knit', self.get_transport('.'), delta=True,
1375
def helpGetBytes(self, f, ft_adapter, delta_adapter):
1376
"""grab the interested adapted texts for tests."""
1377
# origin is a fulltext
1378
entries = f.get_record_stream(['origin'], 'unordered', False)
1379
base = entries.next()
1380
ft_data = ft_adapter.get_bytes(base, base.get_bytes_as(base.storage_kind))
1381
# merged is both a delta and multiple parents.
1382
entries = f.get_record_stream(['merged'], 'unordered', False)
1383
merged = entries.next()
1384
delta_data = delta_adapter.get_bytes(merged,
1385
merged.get_bytes_as(merged.storage_kind))
1386
return ft_data, delta_data
1388
def test_deannotation_noeol(self):
1389
"""Test converting annotated knits to unannotated knits."""
1390
# we need a full text, and a delta
1391
f, parents = get_diamond_vf(self.get_knit(), trailing_eol=False)
1392
ft_data, delta_data = self.helpGetBytes(f,
1393
_mod_knit.FTAnnotatedToUnannotated(),
1394
_mod_knit.DeltaAnnotatedToUnannotated())
1396
'version origin 1 b284f94827db1fa2970d9e2014f080413b547a7e\n'
1399
GzipFile(mode='rb', fileobj=StringIO(ft_data)).read())
1401
'version merged 4 32c2e79763b3f90e8ccde37f9710b6629c25a796\n'
1402
'1,2,3\nleft\nright\nmerged\nend merged\n',
1403
GzipFile(mode='rb', fileobj=StringIO(delta_data)).read())
1405
def test_deannotation(self):
1406
"""Test converting annotated knits to unannotated knits."""
1407
# we need a full text, and a delta
1408
f, parents = get_diamond_vf(self.get_knit())
1409
ft_data, delta_data = self.helpGetBytes(f,
1410
_mod_knit.FTAnnotatedToUnannotated(),
1411
_mod_knit.DeltaAnnotatedToUnannotated())
1413
'version origin 1 00e364d235126be43292ab09cb4686cf703ddc17\n'
1416
GzipFile(mode='rb', fileobj=StringIO(ft_data)).read())
1418
'version merged 3 ed8bce375198ea62444dc71952b22cfc2b09226d\n'
1419
'2,2,2\nright\nmerged\nend merged\n',
1420
GzipFile(mode='rb', fileobj=StringIO(delta_data)).read())
1422
def test_annotated_to_fulltext_no_eol(self):
1423
"""Test adapting annotated knits to full texts (for -> weaves)."""
1424
# we need a full text, and a delta
1425
f, parents = get_diamond_vf(self.get_knit(), trailing_eol=False)
1426
# Reconstructing a full text requires a backing versioned file, and it
1427
# must have the base lines requested from it.
1428
logged_vf = versionedfile.RecordingVersionedFileDecorator(f)
1429
ft_data, delta_data = self.helpGetBytes(f,
1430
_mod_knit.FTAnnotatedToFullText(),
1431
_mod_knit.DeltaAnnotatedToFullText(logged_vf))
1432
self.assertEqual('origin', ft_data)
1433
self.assertEqual('base\nleft\nright\nmerged', delta_data)
1434
self.assertEqual([('get_lines', 'left')], logged_vf.calls)
1436
def test_annotated_to_fulltext(self):
1437
"""Test adapting annotated knits to full texts (for -> weaves)."""
1438
# we need a full text, and a delta
1439
f, parents = get_diamond_vf(self.get_knit())
1440
# Reconstructing a full text requires a backing versioned file, and it
1441
# must have the base lines requested from it.
1442
logged_vf = versionedfile.RecordingVersionedFileDecorator(f)
1443
ft_data, delta_data = self.helpGetBytes(f,
1444
_mod_knit.FTAnnotatedToFullText(),
1445
_mod_knit.DeltaAnnotatedToFullText(logged_vf))
1446
self.assertEqual('origin\n', ft_data)
1447
self.assertEqual('base\nleft\nright\nmerged\n', delta_data)
1448
self.assertEqual([('get_lines', 'left')], logged_vf.calls)