17
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
27
from bzrlib.errors import (
28
RevisionAlreadyPresent,
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
33
24
from bzrlib.knit import (
37
27
KnitAnnotateFactory,
42
29
from bzrlib.osutils import split_lines
43
from bzrlib.tests import TestCase, TestCaseWithTransport
30
from bzrlib.tests import TestCaseWithTransport
44
31
from bzrlib.transport import TransportLogger, get_transport
45
32
from bzrlib.transport.memory import MemoryTransport
46
33
from bzrlib.weave import Weave
49
class KnitContentTests(TestCase):
51
def test_constructor(self):
52
content = KnitContent([])
55
content = KnitContent([])
56
self.assertEqual(content.text(), [])
58
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
59
self.assertEqual(content.text(), ["text1", "text2"])
61
def test_annotate(self):
62
content = KnitContent([])
63
self.assertEqual(content.annotate(), [])
65
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
66
self.assertEqual(content.annotate(),
67
[("origin1", "text1"), ("origin2", "text2")])
69
def test_annotate_iter(self):
70
content = KnitContent([])
71
it = content.annotate_iter()
72
self.assertRaises(StopIteration, it.next)
74
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
75
it = content.annotate_iter()
76
self.assertEqual(it.next(), ("origin1", "text1"))
77
self.assertEqual(it.next(), ("origin2", "text2"))
78
self.assertRaises(StopIteration, it.next)
81
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
83
self.assertIsInstance(copy, KnitContent)
84
self.assertEqual(copy.annotate(),
85
[("origin1", "text1"), ("origin2", "text2")])
87
def test_line_delta(self):
88
content1 = KnitContent([("", "a"), ("", "b")])
89
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
90
self.assertEqual(content1.line_delta(content2),
91
[(1, 2, 2, [("", "a"), ("", "c")])])
93
def test_line_delta_iter(self):
94
content1 = KnitContent([("", "a"), ("", "b")])
95
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
96
it = content1.line_delta_iter(content2)
97
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
98
self.assertRaises(StopIteration, it.next)
101
class MockTransport(object):
103
def __init__(self, file_lines=None):
104
self.file_lines = file_lines
106
# We have no base directory for the MockTransport
109
def get(self, filename):
110
if self.file_lines is None:
111
raise NoSuchFile(filename)
113
return StringIO("\n".join(self.file_lines))
115
def readv(self, relpath, offsets):
116
fp = self.get(relpath)
117
for offset, size in offsets:
119
yield offset, fp.read(size)
121
def __getattr__(self, name):
122
def queue_call(*args, **kwargs):
123
self.calls.append((name, args, kwargs))
127
class LowLevelKnitDataTests(TestCase):
129
def create_gz_content(self, text):
131
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
134
return sio.getvalue()
136
def test_valid_knit_data(self):
137
sha1sum = sha.new('foo\nbar\n').hexdigest()
138
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
143
transport = MockTransport([gz_txt])
144
data = _KnitData(transport, 'filename', mode='r')
145
records = [('rev-id-1', 0, len(gz_txt))]
147
contents = data.read_records(records)
148
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
150
raw_contents = list(data.read_records_iter_raw(records))
151
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
153
def test_not_enough_lines(self):
154
sha1sum = sha.new('foo\n').hexdigest()
155
# record says 2 lines data says 1
156
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
160
transport = MockTransport([gz_txt])
161
data = _KnitData(transport, 'filename', mode='r')
162
records = [('rev-id-1', 0, len(gz_txt))]
163
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
165
# read_records_iter_raw won't detect that sort of mismatch/corruption
166
raw_contents = list(data.read_records_iter_raw(records))
167
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
169
def test_too_many_lines(self):
170
sha1sum = sha.new('foo\nbar\n').hexdigest()
171
# record says 1 lines data says 2
172
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
177
transport = MockTransport([gz_txt])
178
data = _KnitData(transport, 'filename', mode='r')
179
records = [('rev-id-1', 0, len(gz_txt))]
180
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
182
# read_records_iter_raw won't detect that sort of mismatch/corruption
183
raw_contents = list(data.read_records_iter_raw(records))
184
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
186
def test_mismatched_version_id(self):
187
sha1sum = sha.new('foo\nbar\n').hexdigest()
188
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
193
transport = MockTransport([gz_txt])
194
data = _KnitData(transport, 'filename', mode='r')
195
# We are asking for rev-id-2, but the data is rev-id-1
196
records = [('rev-id-2', 0, len(gz_txt))]
197
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
199
# read_records_iter_raw will notice if we request the wrong version.
200
self.assertRaises(errors.KnitCorrupt, list,
201
data.read_records_iter_raw(records))
203
def test_uncompressed_data(self):
204
sha1sum = sha.new('foo\nbar\n').hexdigest()
205
txt = ('version rev-id-1 2 %s\n'
210
transport = MockTransport([txt])
211
data = _KnitData(transport, 'filename', mode='r')
212
records = [('rev-id-1', 0, len(txt))]
214
# We don't have valid gzip data ==> corrupt
215
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
217
# read_records_iter_raw will notice the bad data
218
self.assertRaises(errors.KnitCorrupt, list,
219
data.read_records_iter_raw(records))
221
def test_corrupted_data(self):
222
sha1sum = sha.new('foo\nbar\n').hexdigest()
223
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
228
# Change 2 bytes in the middle to \xff
229
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
230
transport = MockTransport([gz_txt])
231
data = _KnitData(transport, 'filename', mode='r')
232
records = [('rev-id-1', 0, len(gz_txt))]
234
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
236
# read_records_iter_raw will notice if we request the wrong version.
237
self.assertRaises(errors.KnitCorrupt, list,
238
data.read_records_iter_raw(records))
241
class LowLevelKnitIndexTests(TestCase):
243
def test_no_such_file(self):
244
transport = MockTransport()
246
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
247
self.assertRaises(NoSuchFile, _KnitIndex, transport,
248
"filename", "w", create=False)
250
def test_create_file(self):
251
transport = MockTransport()
253
index = _KnitIndex(transport, "filename", "w",
254
file_mode="wb", create=True)
256
("put_bytes_non_atomic",
257
("filename", index.HEADER), {"mode": "wb"}),
258
transport.calls.pop(0))
260
def test_delay_create_file(self):
261
transport = MockTransport()
263
index = _KnitIndex(transport, "filename", "w",
264
create=True, file_mode="wb", create_parent_dir=True,
265
delay_create=True, dir_mode=0777)
266
self.assertEqual([], transport.calls)
268
index.add_versions([])
269
name, (filename, f), kwargs = transport.calls.pop(0)
270
self.assertEqual("put_file_non_atomic", name)
272
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
274
self.assertEqual("filename", filename)
275
self.assertEqual(index.HEADER, f.read())
277
index.add_versions([])
278
self.assertEqual(("append_bytes", ("filename", ""), {}),
279
transport.calls.pop(0))
281
def test_read_utf8_version_id(self):
282
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
283
utf8_revision_id = unicode_revision_id.encode('utf-8')
284
transport = MockTransport([
286
'%s option 0 1 :' % (utf8_revision_id,)
288
index = _KnitIndex(transport, "filename", "r")
289
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
290
# Unicode revision_ids.
291
self.assertTrue(index.has_version(utf8_revision_id))
292
self.assertFalse(index.has_version(unicode_revision_id))
294
def test_read_utf8_parents(self):
295
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
296
utf8_revision_id = unicode_revision_id.encode('utf-8')
297
transport = MockTransport([
299
"version option 0 1 .%s :" % (utf8_revision_id,)
301
index = _KnitIndex(transport, "filename", "r")
302
self.assertEqual([utf8_revision_id],
303
index.get_parents_with_ghosts("version"))
305
def test_read_ignore_corrupted_lines(self):
306
transport = MockTransport([
309
"corrupted options 0 1 .b .c ",
310
"version options 0 1 :"
312
index = _KnitIndex(transport, "filename", "r")
313
self.assertEqual(1, index.num_versions())
314
self.assertTrue(index.has_version("version"))
316
def test_read_corrupted_header(self):
317
transport = MockTransport(['not a bzr knit index header\n'])
318
self.assertRaises(KnitHeaderError,
319
_KnitIndex, transport, "filename", "r")
321
def test_read_duplicate_entries(self):
322
transport = MockTransport([
324
"parent options 0 1 :",
325
"version options1 0 1 0 :",
326
"version options2 1 2 .other :",
327
"version options3 3 4 0 .other :"
329
index = _KnitIndex(transport, "filename", "r")
330
self.assertEqual(2, index.num_versions())
331
self.assertEqual(1, index.lookup("version"))
332
self.assertEqual((3, 4), index.get_position("version"))
333
self.assertEqual(["options3"], index.get_options("version"))
334
self.assertEqual(["parent", "other"],
335
index.get_parents_with_ghosts("version"))
337
def test_read_compressed_parents(self):
338
transport = MockTransport([
342
"c option 0 1 1 0 :",
344
index = _KnitIndex(transport, "filename", "r")
345
self.assertEqual(["a"], index.get_parents("b"))
346
self.assertEqual(["b", "a"], index.get_parents("c"))
348
def test_write_utf8_version_id(self):
349
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
350
utf8_revision_id = unicode_revision_id.encode('utf-8')
351
transport = MockTransport([
354
index = _KnitIndex(transport, "filename", "r")
355
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
356
self.assertEqual(("append_bytes", ("filename",
357
"\n%s option 0 1 :" % (utf8_revision_id,)),
359
transport.calls.pop(0))
361
def test_write_utf8_parents(self):
362
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
363
utf8_revision_id = unicode_revision_id.encode('utf-8')
364
transport = MockTransport([
367
index = _KnitIndex(transport, "filename", "r")
368
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
369
self.assertEqual(("append_bytes", ("filename",
370
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
372
transport.calls.pop(0))
374
def test_get_graph(self):
375
transport = MockTransport()
376
index = _KnitIndex(transport, "filename", "w", create=True)
377
self.assertEqual([], index.get_graph())
379
index.add_version("a", ["option"], 0, 1, ["b"])
380
self.assertEqual([("a", ["b"])], index.get_graph())
382
index.add_version("c", ["option"], 0, 1, ["d"])
383
self.assertEqual([("a", ["b"]), ("c", ["d"])],
384
sorted(index.get_graph()))
386
def test_get_ancestry(self):
387
transport = MockTransport([
390
"b option 0 1 0 .e :",
391
"c option 0 1 1 0 :",
392
"d option 0 1 2 .f :"
394
index = _KnitIndex(transport, "filename", "r")
396
self.assertEqual([], index.get_ancestry([]))
397
self.assertEqual(["a"], index.get_ancestry(["a"]))
398
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
399
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
400
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
401
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
402
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
404
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
406
def test_get_ancestry_with_ghosts(self):
407
transport = MockTransport([
410
"b option 0 1 0 .e :",
411
"c option 0 1 0 .f .g :",
412
"d option 0 1 2 .h .j .k :"
414
index = _KnitIndex(transport, "filename", "r")
416
self.assertEqual([], index.get_ancestry_with_ghosts([]))
417
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
418
self.assertEqual(["a", "e", "b"],
419
index.get_ancestry_with_ghosts(["b"]))
420
self.assertEqual(["a", "g", "f", "c"],
421
index.get_ancestry_with_ghosts(["c"]))
422
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
423
index.get_ancestry_with_ghosts(["d"]))
424
self.assertEqual(["a", "e", "b"],
425
index.get_ancestry_with_ghosts(["a", "b"]))
426
self.assertEqual(["a", "g", "f", "c"],
427
index.get_ancestry_with_ghosts(["a", "c"]))
429
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
430
index.get_ancestry_with_ghosts(["b", "d"]))
432
self.assertRaises(RevisionNotPresent,
433
index.get_ancestry_with_ghosts, ["e"])
435
def test_num_versions(self):
436
transport = MockTransport([
439
index = _KnitIndex(transport, "filename", "r")
441
self.assertEqual(0, index.num_versions())
442
self.assertEqual(0, len(index))
444
index.add_version("a", ["option"], 0, 1, [])
445
self.assertEqual(1, index.num_versions())
446
self.assertEqual(1, len(index))
448
index.add_version("a", ["option2"], 1, 2, [])
449
self.assertEqual(1, index.num_versions())
450
self.assertEqual(1, len(index))
452
index.add_version("b", ["option"], 0, 1, [])
453
self.assertEqual(2, index.num_versions())
454
self.assertEqual(2, len(index))
456
def test_get_versions(self):
457
transport = MockTransport([
460
index = _KnitIndex(transport, "filename", "r")
462
self.assertEqual([], index.get_versions())
464
index.add_version("a", ["option"], 0, 1, [])
465
self.assertEqual(["a"], index.get_versions())
467
index.add_version("a", ["option"], 0, 1, [])
468
self.assertEqual(["a"], index.get_versions())
470
index.add_version("b", ["option"], 0, 1, [])
471
self.assertEqual(["a", "b"], index.get_versions())
473
def test_idx_to_name(self):
474
transport = MockTransport([
479
index = _KnitIndex(transport, "filename", "r")
481
self.assertEqual("a", index.idx_to_name(0))
482
self.assertEqual("b", index.idx_to_name(1))
483
self.assertEqual("b", index.idx_to_name(-1))
484
self.assertEqual("a", index.idx_to_name(-2))
486
def test_lookup(self):
487
transport = MockTransport([
492
index = _KnitIndex(transport, "filename", "r")
494
self.assertEqual(0, index.lookup("a"))
495
self.assertEqual(1, index.lookup("b"))
497
def test_add_version(self):
498
transport = MockTransport([
501
index = _KnitIndex(transport, "filename", "r")
503
index.add_version("a", ["option"], 0, 1, ["b"])
504
self.assertEqual(("append_bytes",
505
("filename", "\na option 0 1 .b :"),
506
{}), transport.calls.pop(0))
507
self.assertTrue(index.has_version("a"))
508
self.assertEqual(1, index.num_versions())
509
self.assertEqual((0, 1), index.get_position("a"))
510
self.assertEqual(["option"], index.get_options("a"))
511
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
513
index.add_version("a", ["opt"], 1, 2, ["c"])
514
self.assertEqual(("append_bytes",
515
("filename", "\na opt 1 2 .c :"),
516
{}), transport.calls.pop(0))
517
self.assertTrue(index.has_version("a"))
518
self.assertEqual(1, index.num_versions())
519
self.assertEqual((1, 2), index.get_position("a"))
520
self.assertEqual(["opt"], index.get_options("a"))
521
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
523
index.add_version("b", ["option"], 2, 3, ["a"])
524
self.assertEqual(("append_bytes",
525
("filename", "\nb option 2 3 0 :"),
526
{}), transport.calls.pop(0))
527
self.assertTrue(index.has_version("b"))
528
self.assertEqual(2, index.num_versions())
529
self.assertEqual((2, 3), index.get_position("b"))
530
self.assertEqual(["option"], index.get_options("b"))
531
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
533
def test_add_versions(self):
534
transport = MockTransport([
537
index = _KnitIndex(transport, "filename", "r")
540
("a", ["option"], 0, 1, ["b"]),
541
("a", ["opt"], 1, 2, ["c"]),
542
("b", ["option"], 2, 3, ["a"])
544
self.assertEqual(("append_bytes", ("filename",
545
"\na option 0 1 .b :"
548
), {}), transport.calls.pop(0))
549
self.assertTrue(index.has_version("a"))
550
self.assertTrue(index.has_version("b"))
551
self.assertEqual(2, index.num_versions())
552
self.assertEqual((1, 2), index.get_position("a"))
553
self.assertEqual((2, 3), index.get_position("b"))
554
self.assertEqual(["opt"], index.get_options("a"))
555
self.assertEqual(["option"], index.get_options("b"))
556
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
557
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
559
def test_delay_create_and_add_versions(self):
560
transport = MockTransport()
562
index = _KnitIndex(transport, "filename", "w",
563
create=True, file_mode="wb", create_parent_dir=True,
564
delay_create=True, dir_mode=0777)
565
self.assertEqual([], transport.calls)
568
("a", ["option"], 0, 1, ["b"]),
569
("a", ["opt"], 1, 2, ["c"]),
570
("b", ["option"], 2, 3, ["a"])
572
name, (filename, f), kwargs = transport.calls.pop(0)
573
self.assertEqual("put_file_non_atomic", name)
575
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
577
self.assertEqual("filename", filename)
580
"\na option 0 1 .b :"
582
"\nb option 2 3 0 :",
585
def test_has_version(self):
586
transport = MockTransport([
590
index = _KnitIndex(transport, "filename", "r")
592
self.assertTrue(index.has_version("a"))
593
self.assertFalse(index.has_version("b"))
595
def test_get_position(self):
596
transport = MockTransport([
601
index = _KnitIndex(transport, "filename", "r")
603
self.assertEqual((0, 1), index.get_position("a"))
604
self.assertEqual((1, 2), index.get_position("b"))
606
def test_get_method(self):
607
transport = MockTransport([
609
"a fulltext,unknown 0 1 :",
610
"b unknown,line-delta 1 2 :",
613
index = _KnitIndex(transport, "filename", "r")
615
self.assertEqual("fulltext", index.get_method("a"))
616
self.assertEqual("line-delta", index.get_method("b"))
617
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
619
def test_get_options(self):
620
transport = MockTransport([
625
index = _KnitIndex(transport, "filename", "r")
627
self.assertEqual(["opt1"], index.get_options("a"))
628
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
630
def test_get_parents(self):
631
transport = MockTransport([
634
"b option 1 2 0 .c :",
635
"c option 1 2 1 0 .e :"
637
index = _KnitIndex(transport, "filename", "r")
639
self.assertEqual([], index.get_parents("a"))
640
self.assertEqual(["a", "c"], index.get_parents("b"))
641
self.assertEqual(["b", "a"], index.get_parents("c"))
643
def test_get_parents_with_ghosts(self):
644
transport = MockTransport([
647
"b option 1 2 0 .c :",
648
"c option 1 2 1 0 .e :"
650
index = _KnitIndex(transport, "filename", "r")
652
self.assertEqual([], index.get_parents_with_ghosts("a"))
653
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
654
self.assertEqual(["b", "a", "e"],
655
index.get_parents_with_ghosts("c"))
657
def test_check_versions_present(self):
658
transport = MockTransport([
663
index = _KnitIndex(transport, "filename", "r")
665
check = index.check_versions_present
671
self.assertRaises(RevisionNotPresent, check, ["c"])
672
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
675
36
class KnitTests(TestCaseWithTransport):
676
37
"""Class containing knit test helper routines."""