17
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
30
from bzrlib.errors import (
31
RevisionAlreadyPresent,
36
from bzrlib.index import *
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
37
24
from bzrlib.knit import (
42
27
KnitAnnotateFactory,
48
29
from bzrlib.osutils import split_lines
49
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
30
from bzrlib.tests import TestCaseWithTransport
50
31
from bzrlib.transport import TransportLogger, get_transport
51
32
from bzrlib.transport.memory import MemoryTransport
52
33
from bzrlib.weave import Weave
55
class _CompiledKnitFeature(Feature):
59
import bzrlib._knit_load_data_c
64
def feature_name(self):
65
return 'bzrlib._knit_load_data_c'
67
CompiledKnitFeature = _CompiledKnitFeature()
70
class KnitContentTests(TestCase):
72
def test_constructor(self):
73
content = KnitContent([])
76
content = KnitContent([])
77
self.assertEqual(content.text(), [])
79
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
80
self.assertEqual(content.text(), ["text1", "text2"])
82
def test_annotate(self):
83
content = KnitContent([])
84
self.assertEqual(content.annotate(), [])
86
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
87
self.assertEqual(content.annotate(),
88
[("origin1", "text1"), ("origin2", "text2")])
90
def test_annotate_iter(self):
91
content = KnitContent([])
92
it = content.annotate_iter()
93
self.assertRaises(StopIteration, it.next)
95
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
96
it = content.annotate_iter()
97
self.assertEqual(it.next(), ("origin1", "text1"))
98
self.assertEqual(it.next(), ("origin2", "text2"))
99
self.assertRaises(StopIteration, it.next)
102
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
103
copy = content.copy()
104
self.assertIsInstance(copy, KnitContent)
105
self.assertEqual(copy.annotate(),
106
[("origin1", "text1"), ("origin2", "text2")])
108
def test_line_delta(self):
109
content1 = KnitContent([("", "a"), ("", "b")])
110
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
111
self.assertEqual(content1.line_delta(content2),
112
[(1, 2, 2, [("", "a"), ("", "c")])])
114
def test_line_delta_iter(self):
115
content1 = KnitContent([("", "a"), ("", "b")])
116
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
117
it = content1.line_delta_iter(content2)
118
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
119
self.assertRaises(StopIteration, it.next)
122
class MockTransport(object):
124
def __init__(self, file_lines=None):
125
self.file_lines = file_lines
127
# We have no base directory for the MockTransport
130
def get(self, filename):
131
if self.file_lines is None:
132
raise NoSuchFile(filename)
134
return StringIO("\n".join(self.file_lines))
136
def readv(self, relpath, offsets):
137
fp = self.get(relpath)
138
for offset, size in offsets:
140
yield offset, fp.read(size)
142
def __getattr__(self, name):
143
def queue_call(*args, **kwargs):
144
self.calls.append((name, args, kwargs))
148
class LowLevelKnitDataTests(TestCase):
150
def create_gz_content(self, text):
152
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
155
return sio.getvalue()
157
def test_valid_knit_data(self):
158
sha1sum = sha.new('foo\nbar\n').hexdigest()
159
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
164
transport = MockTransport([gz_txt])
165
data = _KnitData(transport, 'filename', mode='r')
166
records = [('rev-id-1', 0, len(gz_txt))]
168
contents = data.read_records(records)
169
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
171
raw_contents = list(data.read_records_iter_raw(records))
172
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
174
def test_not_enough_lines(self):
175
sha1sum = sha.new('foo\n').hexdigest()
176
# record says 2 lines data says 1
177
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
181
transport = MockTransport([gz_txt])
182
data = _KnitData(transport, 'filename', mode='r')
183
records = [('rev-id-1', 0, len(gz_txt))]
184
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
186
# read_records_iter_raw won't detect that sort of mismatch/corruption
187
raw_contents = list(data.read_records_iter_raw(records))
188
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
190
def test_too_many_lines(self):
191
sha1sum = sha.new('foo\nbar\n').hexdigest()
192
# record says 1 lines data says 2
193
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
198
transport = MockTransport([gz_txt])
199
data = _KnitData(transport, 'filename', mode='r')
200
records = [('rev-id-1', 0, len(gz_txt))]
201
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
203
# read_records_iter_raw won't detect that sort of mismatch/corruption
204
raw_contents = list(data.read_records_iter_raw(records))
205
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
207
def test_mismatched_version_id(self):
208
sha1sum = sha.new('foo\nbar\n').hexdigest()
209
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
214
transport = MockTransport([gz_txt])
215
data = _KnitData(transport, 'filename', mode='r')
216
# We are asking for rev-id-2, but the data is rev-id-1
217
records = [('rev-id-2', 0, len(gz_txt))]
218
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
220
# read_records_iter_raw will notice if we request the wrong version.
221
self.assertRaises(errors.KnitCorrupt, list,
222
data.read_records_iter_raw(records))
224
def test_uncompressed_data(self):
225
sha1sum = sha.new('foo\nbar\n').hexdigest()
226
txt = ('version rev-id-1 2 %s\n'
231
transport = MockTransport([txt])
232
data = _KnitData(transport, 'filename', mode='r')
233
records = [('rev-id-1', 0, len(txt))]
235
# We don't have valid gzip data ==> corrupt
236
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
238
# read_records_iter_raw will notice the bad data
239
self.assertRaises(errors.KnitCorrupt, list,
240
data.read_records_iter_raw(records))
242
def test_corrupted_data(self):
243
sha1sum = sha.new('foo\nbar\n').hexdigest()
244
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
249
# Change 2 bytes in the middle to \xff
250
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
251
transport = MockTransport([gz_txt])
252
data = _KnitData(transport, 'filename', mode='r')
253
records = [('rev-id-1', 0, len(gz_txt))]
255
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
257
# read_records_iter_raw will notice if we request the wrong version.
258
self.assertRaises(errors.KnitCorrupt, list,
259
data.read_records_iter_raw(records))
262
class LowLevelKnitIndexTests(TestCase):
264
def get_knit_index(self, *args, **kwargs):
265
orig = knit._load_data
267
knit._load_data = orig
268
self.addCleanup(reset)
269
from bzrlib._knit_load_data_py import _load_data_py
270
knit._load_data = _load_data_py
271
return _KnitIndex(*args, **kwargs)
273
def test_no_such_file(self):
274
transport = MockTransport()
276
self.assertRaises(NoSuchFile, self.get_knit_index,
277
transport, "filename", "r")
278
self.assertRaises(NoSuchFile, self.get_knit_index,
279
transport, "filename", "w", create=False)
281
def test_create_file(self):
282
transport = MockTransport()
284
index = self.get_knit_index(transport, "filename", "w",
285
file_mode="wb", create=True)
287
("put_bytes_non_atomic",
288
("filename", index.HEADER), {"mode": "wb"}),
289
transport.calls.pop(0))
291
def test_delay_create_file(self):
292
transport = MockTransport()
294
index = self.get_knit_index(transport, "filename", "w",
295
create=True, file_mode="wb", create_parent_dir=True,
296
delay_create=True, dir_mode=0777)
297
self.assertEqual([], transport.calls)
299
index.add_versions([])
300
name, (filename, f), kwargs = transport.calls.pop(0)
301
self.assertEqual("put_file_non_atomic", name)
303
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
305
self.assertEqual("filename", filename)
306
self.assertEqual(index.HEADER, f.read())
308
index.add_versions([])
309
self.assertEqual(("append_bytes", ("filename", ""), {}),
310
transport.calls.pop(0))
312
def test_read_utf8_version_id(self):
313
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
314
utf8_revision_id = unicode_revision_id.encode('utf-8')
315
transport = MockTransport([
317
'%s option 0 1 :' % (utf8_revision_id,)
319
index = self.get_knit_index(transport, "filename", "r")
320
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
321
# Unicode revision_ids.
322
self.assertTrue(index.has_version(utf8_revision_id))
323
self.assertFalse(index.has_version(unicode_revision_id))
325
def test_read_utf8_parents(self):
326
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
327
utf8_revision_id = unicode_revision_id.encode('utf-8')
328
transport = MockTransport([
330
"version option 0 1 .%s :" % (utf8_revision_id,)
332
index = self.get_knit_index(transport, "filename", "r")
333
self.assertEqual([utf8_revision_id],
334
index.get_parents_with_ghosts("version"))
336
def test_read_ignore_corrupted_lines(self):
337
transport = MockTransport([
340
"corrupted options 0 1 .b .c ",
341
"version options 0 1 :"
343
index = self.get_knit_index(transport, "filename", "r")
344
self.assertEqual(1, index.num_versions())
345
self.assertTrue(index.has_version("version"))
347
def test_read_corrupted_header(self):
348
transport = MockTransport(['not a bzr knit index header\n'])
349
self.assertRaises(KnitHeaderError,
350
self.get_knit_index, transport, "filename", "r")
352
def test_read_duplicate_entries(self):
353
transport = MockTransport([
355
"parent options 0 1 :",
356
"version options1 0 1 0 :",
357
"version options2 1 2 .other :",
358
"version options3 3 4 0 .other :"
360
index = self.get_knit_index(transport, "filename", "r")
361
self.assertEqual(2, index.num_versions())
362
# check that the index used is the first one written. (Specific
363
# to KnitIndex style indices.
364
self.assertEqual("1", index._version_list_to_index(["version"]))
365
self.assertEqual((3, 4), index.get_position("version"))
366
self.assertEqual(["options3"], index.get_options("version"))
367
self.assertEqual(["parent", "other"],
368
index.get_parents_with_ghosts("version"))
370
def test_read_compressed_parents(self):
371
transport = MockTransport([
375
"c option 0 1 1 0 :",
377
index = self.get_knit_index(transport, "filename", "r")
378
self.assertEqual(["a"], index.get_parents("b"))
379
self.assertEqual(["b", "a"], index.get_parents("c"))
381
def test_write_utf8_version_id(self):
382
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
383
utf8_revision_id = unicode_revision_id.encode('utf-8')
384
transport = MockTransport([
387
index = self.get_knit_index(transport, "filename", "r")
388
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
389
self.assertEqual(("append_bytes", ("filename",
390
"\n%s option 0 1 :" % (utf8_revision_id,)),
392
transport.calls.pop(0))
394
def test_write_utf8_parents(self):
395
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
396
utf8_revision_id = unicode_revision_id.encode('utf-8')
397
transport = MockTransport([
400
index = self.get_knit_index(transport, "filename", "r")
401
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
402
self.assertEqual(("append_bytes", ("filename",
403
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
405
transport.calls.pop(0))
407
def test_get_graph(self):
408
transport = MockTransport()
409
index = self.get_knit_index(transport, "filename", "w", create=True)
410
self.assertEqual([], index.get_graph())
412
index.add_version("a", ["option"], 0, 1, ["b"])
413
self.assertEqual([("a", ["b"])], index.get_graph())
415
index.add_version("c", ["option"], 0, 1, ["d"])
416
self.assertEqual([("a", ["b"]), ("c", ["d"])],
417
sorted(index.get_graph()))
419
def test_get_ancestry(self):
420
transport = MockTransport([
423
"b option 0 1 0 .e :",
424
"c option 0 1 1 0 :",
425
"d option 0 1 2 .f :"
427
index = self.get_knit_index(transport, "filename", "r")
429
self.assertEqual([], index.get_ancestry([]))
430
self.assertEqual(["a"], index.get_ancestry(["a"]))
431
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
432
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
433
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
434
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
435
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
437
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
439
def test_get_ancestry_with_ghosts(self):
440
transport = MockTransport([
443
"b option 0 1 0 .e :",
444
"c option 0 1 0 .f .g :",
445
"d option 0 1 2 .h .j .k :"
447
index = self.get_knit_index(transport, "filename", "r")
449
self.assertEqual([], index.get_ancestry_with_ghosts([]))
450
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
451
self.assertEqual(["a", "e", "b"],
452
index.get_ancestry_with_ghosts(["b"]))
453
self.assertEqual(["a", "g", "f", "c"],
454
index.get_ancestry_with_ghosts(["c"]))
455
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
456
index.get_ancestry_with_ghosts(["d"]))
457
self.assertEqual(["a", "e", "b"],
458
index.get_ancestry_with_ghosts(["a", "b"]))
459
self.assertEqual(["a", "g", "f", "c"],
460
index.get_ancestry_with_ghosts(["a", "c"]))
462
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
463
index.get_ancestry_with_ghosts(["b", "d"]))
465
self.assertRaises(RevisionNotPresent,
466
index.get_ancestry_with_ghosts, ["e"])
468
def test_iter_parents(self):
469
transport = MockTransport()
470
index = self.get_knit_index(transport, "filename", "w", create=True)
472
index.add_version('r0', ['option'], 0, 1, [])
474
index.add_version('r1', ['option'], 0, 1, ['r0'])
476
index.add_version('r2', ['option'], 0, 1, ['r1', 'r0'])
478
# cases: each sample data individually:
479
self.assertEqual(set([('r0', ())]),
480
set(index.iter_parents(['r0'])))
481
self.assertEqual(set([('r1', ('r0', ))]),
482
set(index.iter_parents(['r1'])))
483
self.assertEqual(set([('r2', ('r1', 'r0'))]),
484
set(index.iter_parents(['r2'])))
485
# no nodes returned for a missing node
486
self.assertEqual(set(),
487
set(index.iter_parents(['missing'])))
488
# 1 node returned with missing nodes skipped
489
self.assertEqual(set([('r1', ('r0', ))]),
490
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
492
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
493
set(index.iter_parents(['r0', 'r1'])))
494
# 2 nodes returned, missing skipped
495
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
496
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
498
def test_num_versions(self):
499
transport = MockTransport([
502
index = self.get_knit_index(transport, "filename", "r")
504
self.assertEqual(0, index.num_versions())
505
self.assertEqual(0, len(index))
507
index.add_version("a", ["option"], 0, 1, [])
508
self.assertEqual(1, index.num_versions())
509
self.assertEqual(1, len(index))
511
index.add_version("a", ["option2"], 1, 2, [])
512
self.assertEqual(1, index.num_versions())
513
self.assertEqual(1, len(index))
515
index.add_version("b", ["option"], 0, 1, [])
516
self.assertEqual(2, index.num_versions())
517
self.assertEqual(2, len(index))
519
def test_get_versions(self):
520
transport = MockTransport([
523
index = self.get_knit_index(transport, "filename", "r")
525
self.assertEqual([], index.get_versions())
527
index.add_version("a", ["option"], 0, 1, [])
528
self.assertEqual(["a"], index.get_versions())
530
index.add_version("a", ["option"], 0, 1, [])
531
self.assertEqual(["a"], index.get_versions())
533
index.add_version("b", ["option"], 0, 1, [])
534
self.assertEqual(["a", "b"], index.get_versions())
536
def test_add_version(self):
537
transport = MockTransport([
540
index = self.get_knit_index(transport, "filename", "r")
542
index.add_version("a", ["option"], 0, 1, ["b"])
543
self.assertEqual(("append_bytes",
544
("filename", "\na option 0 1 .b :"),
545
{}), transport.calls.pop(0))
546
self.assertTrue(index.has_version("a"))
547
self.assertEqual(1, index.num_versions())
548
self.assertEqual((0, 1), index.get_position("a"))
549
self.assertEqual(["option"], index.get_options("a"))
550
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
552
index.add_version("a", ["opt"], 1, 2, ["c"])
553
self.assertEqual(("append_bytes",
554
("filename", "\na opt 1 2 .c :"),
555
{}), transport.calls.pop(0))
556
self.assertTrue(index.has_version("a"))
557
self.assertEqual(1, index.num_versions())
558
self.assertEqual((1, 2), index.get_position("a"))
559
self.assertEqual(["opt"], index.get_options("a"))
560
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
562
index.add_version("b", ["option"], 2, 3, ["a"])
563
self.assertEqual(("append_bytes",
564
("filename", "\nb option 2 3 0 :"),
565
{}), transport.calls.pop(0))
566
self.assertTrue(index.has_version("b"))
567
self.assertEqual(2, index.num_versions())
568
self.assertEqual((2, 3), index.get_position("b"))
569
self.assertEqual(["option"], index.get_options("b"))
570
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
572
def test_add_versions(self):
573
transport = MockTransport([
576
index = self.get_knit_index(transport, "filename", "r")
579
("a", ["option"], 0, 1, ["b"]),
580
("a", ["opt"], 1, 2, ["c"]),
581
("b", ["option"], 2, 3, ["a"])
583
self.assertEqual(("append_bytes", ("filename",
584
"\na option 0 1 .b :"
587
), {}), transport.calls.pop(0))
588
self.assertTrue(index.has_version("a"))
589
self.assertTrue(index.has_version("b"))
590
self.assertEqual(2, index.num_versions())
591
self.assertEqual((1, 2), index.get_position("a"))
592
self.assertEqual((2, 3), index.get_position("b"))
593
self.assertEqual(["opt"], index.get_options("a"))
594
self.assertEqual(["option"], index.get_options("b"))
595
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
596
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
598
def test_delay_create_and_add_versions(self):
599
transport = MockTransport()
601
index = self.get_knit_index(transport, "filename", "w",
602
create=True, file_mode="wb", create_parent_dir=True,
603
delay_create=True, dir_mode=0777)
604
self.assertEqual([], transport.calls)
607
("a", ["option"], 0, 1, ["b"]),
608
("a", ["opt"], 1, 2, ["c"]),
609
("b", ["option"], 2, 3, ["a"])
611
name, (filename, f), kwargs = transport.calls.pop(0)
612
self.assertEqual("put_file_non_atomic", name)
614
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
616
self.assertEqual("filename", filename)
619
"\na option 0 1 .b :"
621
"\nb option 2 3 0 :",
624
def test_has_version(self):
625
transport = MockTransport([
629
index = self.get_knit_index(transport, "filename", "r")
631
self.assertTrue(index.has_version("a"))
632
self.assertFalse(index.has_version("b"))
634
def test_get_position(self):
635
transport = MockTransport([
640
index = self.get_knit_index(transport, "filename", "r")
642
self.assertEqual((0, 1), index.get_position("a"))
643
self.assertEqual((1, 2), index.get_position("b"))
645
def test_get_method(self):
646
transport = MockTransport([
648
"a fulltext,unknown 0 1 :",
649
"b unknown,line-delta 1 2 :",
652
index = self.get_knit_index(transport, "filename", "r")
654
self.assertEqual("fulltext", index.get_method("a"))
655
self.assertEqual("line-delta", index.get_method("b"))
656
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
658
def test_get_options(self):
659
transport = MockTransport([
664
index = self.get_knit_index(transport, "filename", "r")
666
self.assertEqual(["opt1"], index.get_options("a"))
667
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
669
def test_get_parents(self):
670
transport = MockTransport([
673
"b option 1 2 0 .c :",
674
"c option 1 2 1 0 .e :"
676
index = self.get_knit_index(transport, "filename", "r")
678
self.assertEqual([], index.get_parents("a"))
679
self.assertEqual(["a", "c"], index.get_parents("b"))
680
self.assertEqual(["b", "a"], index.get_parents("c"))
682
def test_get_parents_with_ghosts(self):
683
transport = MockTransport([
686
"b option 1 2 0 .c :",
687
"c option 1 2 1 0 .e :"
689
index = self.get_knit_index(transport, "filename", "r")
691
self.assertEqual([], index.get_parents_with_ghosts("a"))
692
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
693
self.assertEqual(["b", "a", "e"],
694
index.get_parents_with_ghosts("c"))
696
def test_check_versions_present(self):
697
transport = MockTransport([
702
index = self.get_knit_index(transport, "filename", "r")
704
check = index.check_versions_present
710
self.assertRaises(RevisionNotPresent, check, ["c"])
711
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
713
def test_impossible_parent(self):
714
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
715
transport = MockTransport([
718
"b option 0 1 4 :" # We don't have a 4th record
721
self.assertRaises(errors.KnitCorrupt,
722
self.get_knit_index, transport, 'filename', 'r')
724
if (str(e) == ('exceptions must be strings, classes, or instances,'
725
' not exceptions.IndexError')
726
and sys.version_info[0:2] >= (2,5)):
727
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
728
' raising new style exceptions with python'
733
def test_corrupted_parent(self):
734
transport = MockTransport([
738
"c option 0 1 1v :", # Can't have a parent of '1v'
741
self.assertRaises(errors.KnitCorrupt,
742
self.get_knit_index, transport, 'filename', 'r')
744
if (str(e) == ('exceptions must be strings, classes, or instances,'
745
' not exceptions.ValueError')
746
and sys.version_info[0:2] >= (2,5)):
747
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
748
' raising new style exceptions with python'
753
def test_corrupted_parent_in_list(self):
754
transport = MockTransport([
758
"c option 0 1 1 v :", # Can't have a parent of 'v'
761
self.assertRaises(errors.KnitCorrupt,
762
self.get_knit_index, transport, 'filename', 'r')
764
if (str(e) == ('exceptions must be strings, classes, or instances,'
765
' not exceptions.ValueError')
766
and sys.version_info[0:2] >= (2,5)):
767
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
768
' raising new style exceptions with python'
773
def test_invalid_position(self):
774
transport = MockTransport([
779
self.assertRaises(errors.KnitCorrupt,
780
self.get_knit_index, transport, 'filename', 'r')
782
if (str(e) == ('exceptions must be strings, classes, or instances,'
783
' not exceptions.ValueError')
784
and sys.version_info[0:2] >= (2,5)):
785
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
786
' raising new style exceptions with python'
791
def test_invalid_size(self):
792
transport = MockTransport([
797
self.assertRaises(errors.KnitCorrupt,
798
self.get_knit_index, transport, 'filename', 'r')
800
if (str(e) == ('exceptions must be strings, classes, or instances,'
801
' not exceptions.ValueError')
802
and sys.version_info[0:2] >= (2,5)):
803
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
804
' raising new style exceptions with python'
809
def test_short_line(self):
810
transport = MockTransport([
813
"b option 10 10 0", # This line isn't terminated, ignored
815
index = self.get_knit_index(transport, "filename", "r")
816
self.assertEqual(['a'], index.get_versions())
818
def test_skip_incomplete_record(self):
819
# A line with bogus data should just be skipped
820
transport = MockTransport([
823
"b option 10 10 0", # This line isn't terminated, ignored
824
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
826
index = self.get_knit_index(transport, "filename", "r")
827
self.assertEqual(['a', 'c'], index.get_versions())
829
def test_trailing_characters(self):
830
# A line with bogus data should just be skipped
831
transport = MockTransport([
834
"b option 10 10 0 :a", # This line has extra trailing characters
835
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
837
index = self.get_knit_index(transport, "filename", "r")
838
self.assertEqual(['a', 'c'], index.get_versions())
841
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
843
_test_needs_features = [CompiledKnitFeature]
845
def get_knit_index(self, *args, **kwargs):
846
orig = knit._load_data
848
knit._load_data = orig
849
self.addCleanup(reset)
850
from bzrlib._knit_load_data_c import _load_data_c
851
knit._load_data = _load_data_c
852
return _KnitIndex(*args, **kwargs)
856
36
class KnitTests(TestCaseWithTransport):
857
37
"""Class containing knit test helper routines."""
859
def make_test_knit(self, annotate=False, delay_create=False, index=None):
39
def make_test_knit(self, annotate=False):
861
41
factory = KnitPlainFactory()
864
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
865
factory=factory, create=True,
866
delay_create=delay_create, index=index)
44
return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
869
47
class BasicKnitTests(KnitTests):
1519
542
text = k.get_text('text-1')
1520
543
self.assertEqual(TEXT_1, text)
1521
544
self.assertEqual({}, k._data._cache)
1524
class TestKnitIndex(KnitTests):
1526
def test_add_versions_dictionary_compresses(self):
1527
"""Adding versions to the index should update the lookup dict"""
1528
knit = self.make_test_knit()
1530
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1531
self.check_file_contents('test.kndx',
1532
'# bzr knit index 8\n'
1534
'a-1 fulltext 0 0 :'
1536
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1537
('a-3', ['fulltext'], 0, 0, ['a-2']),
1539
self.check_file_contents('test.kndx',
1540
'# bzr knit index 8\n'
1542
'a-1 fulltext 0 0 :\n'
1543
'a-2 fulltext 0 0 0 :\n'
1544
'a-3 fulltext 0 0 1 :'
1546
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1547
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1548
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1549
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1552
def test_add_versions_fails_clean(self):
1553
"""If add_versions fails in the middle, it restores a pristine state.
1555
Any modifications that are made to the index are reset if all versions
1558
# This cheats a little bit by passing in a generator which will
1559
# raise an exception before the processing finishes
1560
# Other possibilities would be to have an version with the wrong number
1561
# of entries, or to make the backing transport unable to write any
1564
knit = self.make_test_knit()
1566
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1568
class StopEarly(Exception):
1571
def generate_failure():
1572
"""Add some entries and then raise an exception"""
1573
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1574
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1577
# Assert the pre-condition
1578
self.assertEqual(['a-1'], idx._history)
1579
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1581
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1583
# And it shouldn't be modified
1584
self.assertEqual(['a-1'], idx._history)
1585
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1587
def test_knit_index_ignores_empty_files(self):
1588
# There was a race condition in older bzr, where a ^C at the right time
1589
# could leave an empty .kndx file, which bzr would later claim was a
1590
# corrupted file since the header was not present. In reality, the file
1591
# just wasn't created, so it should be ignored.
1592
t = get_transport('.')
1593
t.put_bytes('test.kndx', '')
1595
knit = self.make_test_knit()
1597
def test_knit_index_checks_header(self):
1598
t = get_transport('.')
1599
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1601
self.assertRaises(KnitHeaderError, self.make_test_knit)
1604
class TestGraphIndexKnit(KnitTests):
1605
"""Tests for knits using a GraphIndex rather than a KnitIndex."""
1607
def make_g_index(self, name, ref_lists=0, nodes=[]):
1608
builder = GraphIndexBuilder(ref_lists)
1609
for node, references, value in nodes:
1610
builder.add_node(node, references, value)
1611
stream = builder.finish()
1612
trans = self.get_transport()
1613
trans.put_file(name, stream)
1614
return GraphIndex(trans, name)
1616
def two_graph_index(self, deltas=False, catch_adds=False):
1617
"""Build a two-graph index.
1619
:param deltas: If true, use underlying indices with two node-ref
1620
lists and 'parent' set to a delta-compressed against tail.
1622
# build a complex graph across several indices.
1624
# delta compression inn the index
1625
index1 = self.make_g_index('1', 2, [
1626
(('tip', ), 'N0 100', ([('parent', )], [], )),
1627
(('tail', ), '', ([], []))])
1628
index2 = self.make_g_index('2', 2, [
1629
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], [('tail', )])),
1630
(('separate', ), '', ([], []))])
1632
# just blob location and graph in the index.
1633
index1 = self.make_g_index('1', 1, [
1634
(('tip', ), 'N0 100', ([('parent', )], )),
1635
(('tail', ), '', ([], ))])
1636
index2 = self.make_g_index('2', 1, [
1637
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], )),
1638
(('separate', ), '', ([], ))])
1639
combined_index = CombinedGraphIndex([index1, index2])
1641
self.combined_index = combined_index
1642
self.caught_entries = []
1643
add_callback = self.catch_add
1646
return KnitGraphIndex(combined_index, deltas=deltas,
1647
add_callback=add_callback)
1649
def test_get_graph(self):
1650
index = self.two_graph_index()
1651
self.assertEqual(set([
1652
('tip', ('parent', )),
1654
('parent', ('tail', 'ghost')),
1656
]), set(index.get_graph()))
1658
def test_get_ancestry(self):
1659
# get_ancestry is defined as eliding ghosts, not erroring.
1660
index = self.two_graph_index()
1661
self.assertEqual([], index.get_ancestry([]))
1662
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1663
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1664
self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
1665
self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
1666
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1667
(['tail', 'parent', 'tip', 'separate'],
1668
['separate', 'tail', 'parent', 'tip'],
1670
# and without topo_sort
1671
self.assertEqual(set(['separate']),
1672
set(index.get_ancestry(['separate'], topo_sorted=False)))
1673
self.assertEqual(set(['tail']),
1674
set(index.get_ancestry(['tail'], topo_sorted=False)))
1675
self.assertEqual(set(['tail', 'parent']),
1676
set(index.get_ancestry(['parent'], topo_sorted=False)))
1677
self.assertEqual(set(['tail', 'parent', 'tip']),
1678
set(index.get_ancestry(['tip'], topo_sorted=False)))
1679
self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
1680
set(index.get_ancestry(['tip', 'separate'])))
1681
# asking for a ghost makes it go boom.
1682
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1684
def test_get_ancestry_with_ghosts(self):
1685
index = self.two_graph_index()
1686
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1687
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1688
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1689
self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
1690
(['tail', 'ghost', 'parent'],
1691
['ghost', 'tail', 'parent'],
1693
self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
1694
(['tail', 'ghost', 'parent', 'tip'],
1695
['ghost', 'tail', 'parent', 'tip'],
1697
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1698
(['tail', 'ghost', 'parent', 'tip', 'separate'],
1699
['ghost', 'tail', 'parent', 'tip', 'separate'],
1700
['separate', 'tail', 'ghost', 'parent', 'tip'],
1701
['separate', 'ghost', 'tail', 'parent', 'tip'],
1703
# asking for a ghost makes it go boom.
1704
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1706
def test_num_versions(self):
1707
index = self.two_graph_index()
1708
self.assertEqual(4, index.num_versions())
1710
def test_get_versions(self):
1711
index = self.two_graph_index()
1712
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1713
set(index.get_versions()))
1715
def test_has_version(self):
1716
index = self.two_graph_index()
1717
self.assertTrue(index.has_version('tail'))
1718
self.assertFalse(index.has_version('ghost'))
1720
def test_get_position(self):
1721
index = self.two_graph_index()
1722
self.assertEqual((0, 100), index.get_position('tip'))
1723
self.assertEqual((100, 78), index.get_position('parent'))
1725
def test_get_method_deltas(self):
1726
index = self.two_graph_index(deltas=True)
1727
self.assertEqual('fulltext', index.get_method('tip'))
1728
self.assertEqual('line-delta', index.get_method('parent'))
1730
def test_get_method_no_deltas(self):
1731
# check that the parent-history lookup is ignored with deltas=False.
1732
index = self.two_graph_index(deltas=False)
1733
self.assertEqual('fulltext', index.get_method('tip'))
1734
self.assertEqual('fulltext', index.get_method('parent'))
1736
def test_get_options_deltas(self):
1737
index = self.two_graph_index(deltas=True)
1738
self.assertEqual('fulltext,no-eol', index.get_options('tip'))
1739
self.assertEqual('line-delta', index.get_options('parent'))
1741
def test_get_options_no_deltas(self):
1742
# check that the parent-history lookup is ignored with deltas=False.
1743
index = self.two_graph_index(deltas=False)
1744
self.assertEqual('fulltext,no-eol', index.get_options('tip'))
1745
self.assertEqual('fulltext', index.get_options('parent'))
1747
def test_get_parents(self):
1748
# get_parents ignores ghosts
1749
index = self.two_graph_index()
1750
self.assertEqual(('tail', ), index.get_parents('parent'))
1751
# and errors on ghosts.
1752
self.assertRaises(errors.RevisionNotPresent,
1753
index.get_parents, 'ghost')
1755
def test_get_parents_with_ghosts(self):
1756
index = self.two_graph_index()
1757
self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
1758
# and errors on ghosts.
1759
self.assertRaises(errors.RevisionNotPresent,
1760
index.get_parents_with_ghosts, 'ghost')
1762
def test_check_versions_present(self):
1763
# ghosts should not be considered present
1764
index = self.two_graph_index()
1765
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1767
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1769
index.check_versions_present(['tail', 'separate'])
1771
def catch_add(self, entries):
1772
self.caught_entries.append(entries)
1774
def test_add_no_callback_errors(self):
1775
index = self.two_graph_index()
1776
self.assertRaises(errors.ReadOnlyError, index.add_version,
1777
'new', 'fulltext,no-eol', 50, 60, ['separate'])
1779
def test_add_version_smoke(self):
1780
index = self.two_graph_index(catch_adds=True)
1781
index.add_version('new', 'fulltext,no-eol', 50, 60, ['separate'])
1782
self.assertEqual([[(('new', ), 'N50 60', ((('separate',),),))]],
1783
self.caught_entries)
1785
def test_add_version_delta_not_delta_index(self):
1786
index = self.two_graph_index(catch_adds=True)
1787
self.assertRaises(errors.KnitCorrupt, index.add_version,
1788
'new', 'no-eol,line-delta', 0, 100, ['parent'])
1789
self.assertEqual([], self.caught_entries)
1791
def test_add_version_same_dup(self):
1792
index = self.two_graph_index(catch_adds=True)
1793
# options can be spelt two different ways
1794
index.add_version('tip', 'fulltext,no-eol', 0, 100, ['parent'])
1795
index.add_version('tip', 'no-eol,fulltext', 0, 100, ['parent'])
1796
# but neither should have added data.
1797
self.assertEqual([[], []], self.caught_entries)
1799
def test_add_version_different_dup(self):
1800
index = self.two_graph_index(deltas=True, catch_adds=True)
1802
self.assertRaises(errors.KnitCorrupt, index.add_version,
1803
'tip', 'no-eol,line-delta', 0, 100, ['parent'])
1804
self.assertRaises(errors.KnitCorrupt, index.add_version,
1805
'tip', 'line-delta,no-eol', 0, 100, ['parent'])
1806
self.assertRaises(errors.KnitCorrupt, index.add_version,
1807
'tip', 'fulltext', 0, 100, ['parent'])
1809
self.assertRaises(errors.KnitCorrupt, index.add_version,
1810
'tip', 'fulltext,no-eol', 50, 100, ['parent'])
1811
self.assertRaises(errors.KnitCorrupt, index.add_version,
1812
'tip', 'fulltext,no-eol', 0, 1000, ['parent'])
1814
self.assertRaises(errors.KnitCorrupt, index.add_version,
1815
'tip', 'fulltext,no-eol', 0, 100, [])
1816
self.assertEqual([], self.caught_entries)
1818
def test_add_versions_nodeltas(self):
1819
index = self.two_graph_index(catch_adds=True)
1820
index.add_versions([
1821
('new', 'fulltext,no-eol', 50, 60, ['separate']),
1822
('new2', 'fulltext', 0, 6, ['new']),
1824
self.assertEqual([(('new', ), 'N50 60', ((('separate',),),)),
1825
(('new2', ), ' 0 6', ((('new',),),))],
1826
sorted(self.caught_entries[0]))
1827
self.assertEqual(1, len(self.caught_entries))
1829
def test_add_versions_deltas(self):
1830
index = self.two_graph_index(deltas=True, catch_adds=True)
1831
index.add_versions([
1832
('new', 'fulltext,no-eol', 50, 60, ['separate']),
1833
('new2', 'line-delta', 0, 6, ['new']),
1835
self.assertEqual([(('new', ), 'N50 60', ((('separate',),), ())),
1836
(('new2', ), ' 0 6', ((('new',),), (('new',),), ))],
1837
sorted(self.caught_entries[0]))
1838
self.assertEqual(1, len(self.caught_entries))
1840
def test_add_versions_delta_not_delta_index(self):
1841
index = self.two_graph_index(catch_adds=True)
1842
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1843
[('new', 'no-eol,line-delta', 0, 100, ['parent'])])
1844
self.assertEqual([], self.caught_entries)
1846
def test_add_versions_same_dup(self):
1847
index = self.two_graph_index(catch_adds=True)
1848
# options can be spelt two different ways
1849
index.add_versions([('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
1850
index.add_versions([('tip', 'no-eol,fulltext', 0, 100, ['parent'])])
1851
# but neither should have added data.
1852
self.assertEqual([[], []], self.caught_entries)
1854
def test_add_versions_different_dup(self):
1855
index = self.two_graph_index(deltas=True, catch_adds=True)
1857
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1858
[('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
1859
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1860
[('tip', 'line-delta,no-eol', 0, 100, ['parent'])])
1861
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1862
[('tip', 'fulltext', 0, 100, ['parent'])])
1864
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1865
[('tip', 'fulltext,no-eol', 50, 100, ['parent'])])
1866
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1867
[('tip', 'fulltext,no-eol', 0, 1000, ['parent'])])
1869
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1870
[('tip', 'fulltext,no-eol', 0, 100, [])])
1871
# change options in the second record
1872
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1873
[('tip', 'fulltext,no-eol', 0, 100, ['parent']),
1874
('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
1875
self.assertEqual([], self.caught_entries)
1877
def test_iter_parents(self):
1878
index1 = self.make_g_index('1', 1, [
1880
(('r0', ), 'N0 100', ([], )),
1882
(('r1', ), '', ([('r0', )], ))])
1883
index2 = self.make_g_index('2', 1, [
1885
(('r2', ), 'N0 100', ([('r1', ), ('r0', )], )),
1887
combined_index = CombinedGraphIndex([index1, index2])
1888
index = KnitGraphIndex(combined_index)
1890
# cases: each sample data individually:
1891
self.assertEqual(set([('r0', ())]),
1892
set(index.iter_parents(['r0'])))
1893
self.assertEqual(set([('r1', ('r0', ))]),
1894
set(index.iter_parents(['r1'])))
1895
self.assertEqual(set([('r2', ('r1', 'r0'))]),
1896
set(index.iter_parents(['r2'])))
1897
# no nodes returned for a missing node
1898
self.assertEqual(set(),
1899
set(index.iter_parents(['missing'])))
1900
# 1 node returned with missing nodes skipped
1901
self.assertEqual(set([('r1', ('r0', ))]),
1902
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
1904
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
1905
set(index.iter_parents(['r0', 'r1'])))
1906
# 2 nodes returned, missing skipped
1907
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
1908
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
1911
class TestNoParentsGraphIndexKnit(KnitTests):
1912
"""Tests for knits using KnitGraphIndex with no parents."""
1914
def make_g_index(self, name, ref_lists=0, nodes=[]):
1915
builder = GraphIndexBuilder(ref_lists)
1916
for node, references in nodes:
1917
builder.add_node(node, references)
1918
stream = builder.finish()
1919
trans = self.get_transport()
1920
trans.put_file(name, stream)
1921
return GraphIndex(trans, name)
1923
def test_parents_deltas_incompatible(self):
1924
index = CombinedGraphIndex([])
1925
self.assertRaises(errors.KnitError, KnitGraphIndex, index,
1926
deltas=True, parents=False)
1928
def two_graph_index(self, catch_adds=False):
1929
"""Build a two-graph index.
1931
:param deltas: If true, use underlying indices with two node-ref
1932
lists and 'parent' set to a delta-compressed against tail.
1934
# put several versions in the index.
1935
index1 = self.make_g_index('1', 0, [
1936
(('tip', ), 'N0 100'),
1938
index2 = self.make_g_index('2', 0, [
1939
(('parent', ), ' 100 78'),
1940
(('separate', ), '')])
1941
combined_index = CombinedGraphIndex([index1, index2])
1943
self.combined_index = combined_index
1944
self.caught_entries = []
1945
add_callback = self.catch_add
1948
return KnitGraphIndex(combined_index, parents=False,
1949
add_callback=add_callback)
1951
def test_get_graph(self):
1952
index = self.two_graph_index()
1953
self.assertEqual(set([
1958
]), set(index.get_graph()))
1960
def test_get_ancestry(self):
1961
# with no parents, ancestry is always just the key.
1962
index = self.two_graph_index()
1963
self.assertEqual([], index.get_ancestry([]))
1964
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1965
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1966
self.assertEqual(['parent'], index.get_ancestry(['parent']))
1967
self.assertEqual(['tip'], index.get_ancestry(['tip']))
1968
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1969
(['tip', 'separate'],
1970
['separate', 'tip'],
1972
# asking for a ghost makes it go boom.
1973
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1975
def test_get_ancestry_with_ghosts(self):
1976
index = self.two_graph_index()
1977
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1978
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1979
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1980
self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
1981
self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
1982
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1983
(['tip', 'separate'],
1984
['separate', 'tip'],
1986
# asking for a ghost makes it go boom.
1987
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1989
def test_num_versions(self):
1990
index = self.two_graph_index()
1991
self.assertEqual(4, index.num_versions())
1993
def test_get_versions(self):
1994
index = self.two_graph_index()
1995
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1996
set(index.get_versions()))
1998
def test_has_version(self):
1999
index = self.two_graph_index()
2000
self.assertTrue(index.has_version('tail'))
2001
self.assertFalse(index.has_version('ghost'))
2003
def test_get_position(self):
2004
index = self.two_graph_index()
2005
self.assertEqual((0, 100), index.get_position('tip'))
2006
self.assertEqual((100, 78), index.get_position('parent'))
2008
def test_get_method(self):
2009
index = self.two_graph_index()
2010
self.assertEqual('fulltext', index.get_method('tip'))
2011
self.assertEqual('fulltext', index.get_options('parent'))
2013
def test_get_options(self):
2014
index = self.two_graph_index()
2015
self.assertEqual('fulltext,no-eol', index.get_options('tip'))
2016
self.assertEqual('fulltext', index.get_options('parent'))
2018
def test_get_parents(self):
2019
index = self.two_graph_index()
2020
self.assertEqual((), index.get_parents('parent'))
2021
# and errors on ghosts.
2022
self.assertRaises(errors.RevisionNotPresent,
2023
index.get_parents, 'ghost')
2025
def test_get_parents_with_ghosts(self):
2026
index = self.two_graph_index()
2027
self.assertEqual((), index.get_parents_with_ghosts('parent'))
2028
# and errors on ghosts.
2029
self.assertRaises(errors.RevisionNotPresent,
2030
index.get_parents_with_ghosts, 'ghost')
2032
def test_check_versions_present(self):
2033
index = self.two_graph_index()
2034
self.assertRaises(RevisionNotPresent, index.check_versions_present,
2036
self.assertRaises(RevisionNotPresent, index.check_versions_present,
2037
['tail', 'missing'])
2038
index.check_versions_present(['tail', 'separate'])
2040
def catch_add(self, entries):
2041
self.caught_entries.append(entries)
2043
def test_add_no_callback_errors(self):
2044
index = self.two_graph_index()
2045
self.assertRaises(errors.ReadOnlyError, index.add_version,
2046
'new', 'fulltext,no-eol', 50, 60, ['separate'])
2048
def test_add_version_smoke(self):
2049
index = self.two_graph_index(catch_adds=True)
2050
index.add_version('new', 'fulltext,no-eol', 50, 60, [])
2051
self.assertEqual([[(('new', ), 'N50 60')]],
2052
self.caught_entries)
2054
def test_add_version_delta_not_delta_index(self):
2055
index = self.two_graph_index(catch_adds=True)
2056
self.assertRaises(errors.KnitCorrupt, index.add_version,
2057
'new', 'no-eol,line-delta', 0, 100, [])
2058
self.assertEqual([], self.caught_entries)
2060
def test_add_version_same_dup(self):
2061
index = self.two_graph_index(catch_adds=True)
2062
# options can be spelt two different ways
2063
index.add_version('tip', 'fulltext,no-eol', 0, 100, [])
2064
index.add_version('tip', 'no-eol,fulltext', 0, 100, [])
2065
# but neither should have added data.
2066
self.assertEqual([[], []], self.caught_entries)
2068
def test_add_version_different_dup(self):
2069
index = self.two_graph_index(catch_adds=True)
2071
self.assertRaises(errors.KnitCorrupt, index.add_version,
2072
'tip', 'no-eol,line-delta', 0, 100, [])
2073
self.assertRaises(errors.KnitCorrupt, index.add_version,
2074
'tip', 'line-delta,no-eol', 0, 100, [])
2075
self.assertRaises(errors.KnitCorrupt, index.add_version,
2076
'tip', 'fulltext', 0, 100, [])
2078
self.assertRaises(errors.KnitCorrupt, index.add_version,
2079
'tip', 'fulltext,no-eol', 50, 100, [])
2080
self.assertRaises(errors.KnitCorrupt, index.add_version,
2081
'tip', 'fulltext,no-eol', 0, 1000, [])
2083
self.assertRaises(errors.KnitCorrupt, index.add_version,
2084
'tip', 'fulltext,no-eol', 0, 100, ['parent'])
2085
self.assertEqual([], self.caught_entries)
2087
def test_add_versions(self):
2088
index = self.two_graph_index(catch_adds=True)
2089
index.add_versions([
2090
('new', 'fulltext,no-eol', 50, 60, []),
2091
('new2', 'fulltext', 0, 6, []),
2093
self.assertEqual([(('new', ), 'N50 60'), (('new2', ), ' 0 6')],
2094
sorted(self.caught_entries[0]))
2095
self.assertEqual(1, len(self.caught_entries))
2097
def test_add_versions_delta_not_delta_index(self):
2098
index = self.two_graph_index(catch_adds=True)
2099
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2100
[('new', 'no-eol,line-delta', 0, 100, ['parent'])])
2101
self.assertEqual([], self.caught_entries)
2103
def test_add_versions_parents_not_parents_index(self):
2104
index = self.two_graph_index(catch_adds=True)
2105
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2106
[('new', 'no-eol,fulltext', 0, 100, ['parent'])])
2107
self.assertEqual([], self.caught_entries)
2109
def test_add_versions_same_dup(self):
2110
index = self.two_graph_index(catch_adds=True)
2111
# options can be spelt two different ways
2112
index.add_versions([('tip', 'fulltext,no-eol', 0, 100, [])])
2113
index.add_versions([('tip', 'no-eol,fulltext', 0, 100, [])])
2114
# but neither should have added data.
2115
self.assertEqual([[], []], self.caught_entries)
2117
def test_add_versions_different_dup(self):
2118
index = self.two_graph_index(catch_adds=True)
2120
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2121
[('tip', 'no-eol,line-delta', 0, 100, [])])
2122
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2123
[('tip', 'line-delta,no-eol', 0, 100, [])])
2124
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2125
[('tip', 'fulltext', 0, 100, [])])
2127
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2128
[('tip', 'fulltext,no-eol', 50, 100, [])])
2129
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2130
[('tip', 'fulltext,no-eol', 0, 1000, [])])
2132
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2133
[('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
2134
# change options in the second record
2135
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2136
[('tip', 'fulltext,no-eol', 0, 100, []),
2137
('tip', 'no-eol,line-delta', 0, 100, [])])
2138
self.assertEqual([], self.caught_entries)
2140
def test_iter_parents(self):
2141
index = self.two_graph_index()
2142
self.assertEqual(set([
2143
('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
2145
set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
2146
self.assertEqual(set([('tip', ())]),
2147
set(index.iter_parents(['tip'])))
2148
self.assertEqual(set(),
2149
set(index.iter_parents([])))