1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
30
from bzrlib.errors import (
31
RevisionAlreadyPresent,
36
from bzrlib.knit import (
45
from bzrlib.osutils import split_lines
46
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
47
from bzrlib.transport import TransportLogger, get_transport
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.weave import Weave
52
class _CompiledKnitFeature(Feature):
56
import bzrlib._knit_load_data_c
61
def feature_name(self):
62
return 'bzrlib._knit_load_data_c'
64
CompiledKnitFeature = _CompiledKnitFeature()
67
class KnitContentTests(TestCase):
69
def test_constructor(self):
70
content = KnitContent([])
73
content = KnitContent([])
74
self.assertEqual(content.text(), [])
76
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
77
self.assertEqual(content.text(), ["text1", "text2"])
79
def test_annotate(self):
80
content = KnitContent([])
81
self.assertEqual(content.annotate(), [])
83
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
84
self.assertEqual(content.annotate(),
85
[("origin1", "text1"), ("origin2", "text2")])
87
def test_annotate_iter(self):
88
content = KnitContent([])
89
it = content.annotate_iter()
90
self.assertRaises(StopIteration, it.next)
92
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
93
it = content.annotate_iter()
94
self.assertEqual(it.next(), ("origin1", "text1"))
95
self.assertEqual(it.next(), ("origin2", "text2"))
96
self.assertRaises(StopIteration, it.next)
99
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
100
copy = content.copy()
101
self.assertIsInstance(copy, KnitContent)
102
self.assertEqual(copy.annotate(),
103
[("origin1", "text1"), ("origin2", "text2")])
105
def test_line_delta(self):
106
content1 = KnitContent([("", "a"), ("", "b")])
107
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
108
self.assertEqual(content1.line_delta(content2),
109
[(1, 2, 2, [("", "a"), ("", "c")])])
111
def test_line_delta_iter(self):
112
content1 = KnitContent([("", "a"), ("", "b")])
113
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
114
it = content1.line_delta_iter(content2)
115
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
116
self.assertRaises(StopIteration, it.next)
119
class MockTransport(object):
121
def __init__(self, file_lines=None):
122
self.file_lines = file_lines
124
# We have no base directory for the MockTransport
127
def get(self, filename):
128
if self.file_lines is None:
129
raise NoSuchFile(filename)
131
return StringIO("\n".join(self.file_lines))
133
def readv(self, relpath, offsets):
134
fp = self.get(relpath)
135
for offset, size in offsets:
137
yield offset, fp.read(size)
139
def __getattr__(self, name):
140
def queue_call(*args, **kwargs):
141
self.calls.append((name, args, kwargs))
145
class LowLevelKnitDataTests(TestCase):
147
def create_gz_content(self, text):
149
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
152
return sio.getvalue()
154
def test_valid_knit_data(self):
155
sha1sum = sha.new('foo\nbar\n').hexdigest()
156
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
161
transport = MockTransport([gz_txt])
162
data = _KnitData(transport, 'filename', mode='r')
163
records = [('rev-id-1', 0, len(gz_txt))]
165
contents = data.read_records(records)
166
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
168
raw_contents = list(data.read_records_iter_raw(records))
169
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
171
def test_not_enough_lines(self):
172
sha1sum = sha.new('foo\n').hexdigest()
173
# record says 2 lines data says 1
174
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
178
transport = MockTransport([gz_txt])
179
data = _KnitData(transport, 'filename', mode='r')
180
records = [('rev-id-1', 0, len(gz_txt))]
181
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
183
# read_records_iter_raw won't detect that sort of mismatch/corruption
184
raw_contents = list(data.read_records_iter_raw(records))
185
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
187
def test_too_many_lines(self):
188
sha1sum = sha.new('foo\nbar\n').hexdigest()
189
# record says 1 lines data says 2
190
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
195
transport = MockTransport([gz_txt])
196
data = _KnitData(transport, 'filename', mode='r')
197
records = [('rev-id-1', 0, len(gz_txt))]
198
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
200
# read_records_iter_raw won't detect that sort of mismatch/corruption
201
raw_contents = list(data.read_records_iter_raw(records))
202
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
204
def test_mismatched_version_id(self):
205
sha1sum = sha.new('foo\nbar\n').hexdigest()
206
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
211
transport = MockTransport([gz_txt])
212
data = _KnitData(transport, 'filename', mode='r')
213
# We are asking for rev-id-2, but the data is rev-id-1
214
records = [('rev-id-2', 0, len(gz_txt))]
215
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
217
# read_records_iter_raw will notice if we request the wrong version.
218
self.assertRaises(errors.KnitCorrupt, list,
219
data.read_records_iter_raw(records))
221
def test_uncompressed_data(self):
222
sha1sum = sha.new('foo\nbar\n').hexdigest()
223
txt = ('version rev-id-1 2 %s\n'
228
transport = MockTransport([txt])
229
data = _KnitData(transport, 'filename', mode='r')
230
records = [('rev-id-1', 0, len(txt))]
232
# We don't have valid gzip data ==> corrupt
233
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
235
# read_records_iter_raw will notice the bad data
236
self.assertRaises(errors.KnitCorrupt, list,
237
data.read_records_iter_raw(records))
239
def test_corrupted_data(self):
240
sha1sum = sha.new('foo\nbar\n').hexdigest()
241
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
246
# Change 2 bytes in the middle to \xff
247
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
248
transport = MockTransport([gz_txt])
249
data = _KnitData(transport, 'filename', mode='r')
250
records = [('rev-id-1', 0, len(gz_txt))]
252
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
254
# read_records_iter_raw will notice if we request the wrong version.
255
self.assertRaises(errors.KnitCorrupt, list,
256
data.read_records_iter_raw(records))
259
class LowLevelKnitIndexTests(TestCase):
261
def get_knit_index(self, *args, **kwargs):
262
orig = knit._load_data
264
knit._load_data = orig
265
self.addCleanup(reset)
266
from bzrlib._knit_load_data_py import _load_data_py
267
knit._load_data = _load_data_py
268
return _KnitIndex(*args, **kwargs)
270
def test_no_such_file(self):
271
transport = MockTransport()
273
self.assertRaises(NoSuchFile, self.get_knit_index,
274
transport, "filename", "r")
275
self.assertRaises(NoSuchFile, self.get_knit_index,
276
transport, "filename", "w", create=False)
278
def test_create_file(self):
279
transport = MockTransport()
281
index = self.get_knit_index(transport, "filename", "w",
282
file_mode="wb", create=True)
284
("put_bytes_non_atomic",
285
("filename", index.HEADER), {"mode": "wb"}),
286
transport.calls.pop(0))
288
def test_delay_create_file(self):
289
transport = MockTransport()
291
index = self.get_knit_index(transport, "filename", "w",
292
create=True, file_mode="wb", create_parent_dir=True,
293
delay_create=True, dir_mode=0777)
294
self.assertEqual([], transport.calls)
296
index.add_versions([])
297
name, (filename, f), kwargs = transport.calls.pop(0)
298
self.assertEqual("put_file_non_atomic", name)
300
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
302
self.assertEqual("filename", filename)
303
self.assertEqual(index.HEADER, f.read())
305
index.add_versions([])
306
self.assertEqual(("append_bytes", ("filename", ""), {}),
307
transport.calls.pop(0))
309
def test_read_utf8_version_id(self):
310
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
311
utf8_revision_id = unicode_revision_id.encode('utf-8')
312
transport = MockTransport([
314
'%s option 0 1 :' % (utf8_revision_id,)
316
index = self.get_knit_index(transport, "filename", "r")
317
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
318
# Unicode revision_ids.
319
self.assertTrue(index.has_version(utf8_revision_id))
320
self.assertFalse(index.has_version(unicode_revision_id))
322
def test_read_utf8_parents(self):
323
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
324
utf8_revision_id = unicode_revision_id.encode('utf-8')
325
transport = MockTransport([
327
"version option 0 1 .%s :" % (utf8_revision_id,)
329
index = self.get_knit_index(transport, "filename", "r")
330
self.assertEqual([utf8_revision_id],
331
index.get_parents_with_ghosts("version"))
333
def test_read_ignore_corrupted_lines(self):
334
transport = MockTransport([
337
"corrupted options 0 1 .b .c ",
338
"version options 0 1 :"
340
index = self.get_knit_index(transport, "filename", "r")
341
self.assertEqual(1, index.num_versions())
342
self.assertTrue(index.has_version("version"))
344
def test_read_corrupted_header(self):
345
transport = MockTransport(['not a bzr knit index header\n'])
346
self.assertRaises(KnitHeaderError,
347
self.get_knit_index, transport, "filename", "r")
349
def test_read_duplicate_entries(self):
350
transport = MockTransport([
352
"parent options 0 1 :",
353
"version options1 0 1 0 :",
354
"version options2 1 2 .other :",
355
"version options3 3 4 0 .other :"
357
index = self.get_knit_index(transport, "filename", "r")
358
self.assertEqual(2, index.num_versions())
359
self.assertEqual(1, index.lookup("version"))
360
self.assertEqual((3, 4), index.get_position("version"))
361
self.assertEqual(["options3"], index.get_options("version"))
362
self.assertEqual(["parent", "other"],
363
index.get_parents_with_ghosts("version"))
365
def test_read_compressed_parents(self):
366
transport = MockTransport([
370
"c option 0 1 1 0 :",
372
index = self.get_knit_index(transport, "filename", "r")
373
self.assertEqual(["a"], index.get_parents("b"))
374
self.assertEqual(["b", "a"], index.get_parents("c"))
376
def test_write_utf8_version_id(self):
377
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
378
utf8_revision_id = unicode_revision_id.encode('utf-8')
379
transport = MockTransport([
382
index = self.get_knit_index(transport, "filename", "r")
383
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
384
self.assertEqual(("append_bytes", ("filename",
385
"\n%s option 0 1 :" % (utf8_revision_id,)),
387
transport.calls.pop(0))
389
def test_write_utf8_parents(self):
390
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
391
utf8_revision_id = unicode_revision_id.encode('utf-8')
392
transport = MockTransport([
395
index = self.get_knit_index(transport, "filename", "r")
396
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
397
self.assertEqual(("append_bytes", ("filename",
398
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
400
transport.calls.pop(0))
402
def test_get_graph(self):
403
transport = MockTransport()
404
index = self.get_knit_index(transport, "filename", "w", create=True)
405
self.assertEqual([], index.get_graph())
407
index.add_version("a", ["option"], 0, 1, ["b"])
408
self.assertEqual([("a", ["b"])], index.get_graph())
410
index.add_version("c", ["option"], 0, 1, ["d"])
411
self.assertEqual([("a", ["b"]), ("c", ["d"])],
412
sorted(index.get_graph()))
414
def test_get_ancestry(self):
415
transport = MockTransport([
418
"b option 0 1 0 .e :",
419
"c option 0 1 1 0 :",
420
"d option 0 1 2 .f :"
422
index = self.get_knit_index(transport, "filename", "r")
424
self.assertEqual([], index.get_ancestry([]))
425
self.assertEqual(["a"], index.get_ancestry(["a"]))
426
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
427
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
428
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
429
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
430
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
432
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
434
def test_get_ancestry_with_ghosts(self):
435
transport = MockTransport([
438
"b option 0 1 0 .e :",
439
"c option 0 1 0 .f .g :",
440
"d option 0 1 2 .h .j .k :"
442
index = self.get_knit_index(transport, "filename", "r")
444
self.assertEqual([], index.get_ancestry_with_ghosts([]))
445
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
446
self.assertEqual(["a", "e", "b"],
447
index.get_ancestry_with_ghosts(["b"]))
448
self.assertEqual(["a", "g", "f", "c"],
449
index.get_ancestry_with_ghosts(["c"]))
450
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
451
index.get_ancestry_with_ghosts(["d"]))
452
self.assertEqual(["a", "e", "b"],
453
index.get_ancestry_with_ghosts(["a", "b"]))
454
self.assertEqual(["a", "g", "f", "c"],
455
index.get_ancestry_with_ghosts(["a", "c"]))
457
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
458
index.get_ancestry_with_ghosts(["b", "d"]))
460
self.assertRaises(RevisionNotPresent,
461
index.get_ancestry_with_ghosts, ["e"])
463
def test_num_versions(self):
464
transport = MockTransport([
467
index = self.get_knit_index(transport, "filename", "r")
469
self.assertEqual(0, index.num_versions())
470
self.assertEqual(0, len(index))
472
index.add_version("a", ["option"], 0, 1, [])
473
self.assertEqual(1, index.num_versions())
474
self.assertEqual(1, len(index))
476
index.add_version("a", ["option2"], 1, 2, [])
477
self.assertEqual(1, index.num_versions())
478
self.assertEqual(1, len(index))
480
index.add_version("b", ["option"], 0, 1, [])
481
self.assertEqual(2, index.num_versions())
482
self.assertEqual(2, len(index))
484
def test_get_versions(self):
485
transport = MockTransport([
488
index = self.get_knit_index(transport, "filename", "r")
490
self.assertEqual([], index.get_versions())
492
index.add_version("a", ["option"], 0, 1, [])
493
self.assertEqual(["a"], index.get_versions())
495
index.add_version("a", ["option"], 0, 1, [])
496
self.assertEqual(["a"], index.get_versions())
498
index.add_version("b", ["option"], 0, 1, [])
499
self.assertEqual(["a", "b"], index.get_versions())
501
def test_idx_to_name(self):
502
transport = MockTransport([
507
index = self.get_knit_index(transport, "filename", "r")
509
self.assertEqual("a", index.idx_to_name(0))
510
self.assertEqual("b", index.idx_to_name(1))
511
self.assertEqual("b", index.idx_to_name(-1))
512
self.assertEqual("a", index.idx_to_name(-2))
514
def test_lookup(self):
515
transport = MockTransport([
520
index = self.get_knit_index(transport, "filename", "r")
522
self.assertEqual(0, index.lookup("a"))
523
self.assertEqual(1, index.lookup("b"))
525
def test_add_version(self):
526
transport = MockTransport([
529
index = self.get_knit_index(transport, "filename", "r")
531
index.add_version("a", ["option"], 0, 1, ["b"])
532
self.assertEqual(("append_bytes",
533
("filename", "\na option 0 1 .b :"),
534
{}), transport.calls.pop(0))
535
self.assertTrue(index.has_version("a"))
536
self.assertEqual(1, index.num_versions())
537
self.assertEqual((0, 1), index.get_position("a"))
538
self.assertEqual(["option"], index.get_options("a"))
539
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
541
index.add_version("a", ["opt"], 1, 2, ["c"])
542
self.assertEqual(("append_bytes",
543
("filename", "\na opt 1 2 .c :"),
544
{}), transport.calls.pop(0))
545
self.assertTrue(index.has_version("a"))
546
self.assertEqual(1, index.num_versions())
547
self.assertEqual((1, 2), index.get_position("a"))
548
self.assertEqual(["opt"], index.get_options("a"))
549
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
551
index.add_version("b", ["option"], 2, 3, ["a"])
552
self.assertEqual(("append_bytes",
553
("filename", "\nb option 2 3 0 :"),
554
{}), transport.calls.pop(0))
555
self.assertTrue(index.has_version("b"))
556
self.assertEqual(2, index.num_versions())
557
self.assertEqual((2, 3), index.get_position("b"))
558
self.assertEqual(["option"], index.get_options("b"))
559
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
561
def test_add_versions(self):
562
transport = MockTransport([
565
index = self.get_knit_index(transport, "filename", "r")
568
("a", ["option"], 0, 1, ["b"]),
569
("a", ["opt"], 1, 2, ["c"]),
570
("b", ["option"], 2, 3, ["a"])
572
self.assertEqual(("append_bytes", ("filename",
573
"\na option 0 1 .b :"
576
), {}), transport.calls.pop(0))
577
self.assertTrue(index.has_version("a"))
578
self.assertTrue(index.has_version("b"))
579
self.assertEqual(2, index.num_versions())
580
self.assertEqual((1, 2), index.get_position("a"))
581
self.assertEqual((2, 3), index.get_position("b"))
582
self.assertEqual(["opt"], index.get_options("a"))
583
self.assertEqual(["option"], index.get_options("b"))
584
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
585
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
587
def test_delay_create_and_add_versions(self):
588
transport = MockTransport()
590
index = self.get_knit_index(transport, "filename", "w",
591
create=True, file_mode="wb", create_parent_dir=True,
592
delay_create=True, dir_mode=0777)
593
self.assertEqual([], transport.calls)
596
("a", ["option"], 0, 1, ["b"]),
597
("a", ["opt"], 1, 2, ["c"]),
598
("b", ["option"], 2, 3, ["a"])
600
name, (filename, f), kwargs = transport.calls.pop(0)
601
self.assertEqual("put_file_non_atomic", name)
603
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
605
self.assertEqual("filename", filename)
608
"\na option 0 1 .b :"
610
"\nb option 2 3 0 :",
613
def test_has_version(self):
614
transport = MockTransport([
618
index = self.get_knit_index(transport, "filename", "r")
620
self.assertTrue(index.has_version("a"))
621
self.assertFalse(index.has_version("b"))
623
def test_get_position(self):
624
transport = MockTransport([
629
index = self.get_knit_index(transport, "filename", "r")
631
self.assertEqual((0, 1), index.get_position("a"))
632
self.assertEqual((1, 2), index.get_position("b"))
634
def test_get_method(self):
635
transport = MockTransport([
637
"a fulltext,unknown 0 1 :",
638
"b unknown,line-delta 1 2 :",
641
index = self.get_knit_index(transport, "filename", "r")
643
self.assertEqual("fulltext", index.get_method("a"))
644
self.assertEqual("line-delta", index.get_method("b"))
645
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
647
def test_get_options(self):
648
transport = MockTransport([
653
index = self.get_knit_index(transport, "filename", "r")
655
self.assertEqual(["opt1"], index.get_options("a"))
656
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
658
def test_get_parents(self):
659
transport = MockTransport([
662
"b option 1 2 0 .c :",
663
"c option 1 2 1 0 .e :"
665
index = self.get_knit_index(transport, "filename", "r")
667
self.assertEqual([], index.get_parents("a"))
668
self.assertEqual(["a", "c"], index.get_parents("b"))
669
self.assertEqual(["b", "a"], index.get_parents("c"))
671
def test_get_parents_with_ghosts(self):
672
transport = MockTransport([
675
"b option 1 2 0 .c :",
676
"c option 1 2 1 0 .e :"
678
index = self.get_knit_index(transport, "filename", "r")
680
self.assertEqual([], index.get_parents_with_ghosts("a"))
681
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
682
self.assertEqual(["b", "a", "e"],
683
index.get_parents_with_ghosts("c"))
685
def test_check_versions_present(self):
686
transport = MockTransport([
691
index = self.get_knit_index(transport, "filename", "r")
693
check = index.check_versions_present
699
self.assertRaises(RevisionNotPresent, check, ["c"])
700
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
702
def test_impossible_parent(self):
703
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
704
transport = MockTransport([
707
"b option 0 1 4 :" # We don't have a 4th record
710
self.assertRaises(errors.KnitCorrupt,
711
self.get_knit_index, transport, 'filename', 'r')
713
if (str(e) == ('exceptions must be strings, classes, or instances,'
714
' not exceptions.IndexError')
715
and sys.version_info[0:2] >= (2,5)):
716
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
717
' raising new style exceptions with python'
722
def test_corrupted_parent(self):
723
transport = MockTransport([
727
"c option 0 1 1v :", # Can't have a parent of '1v'
730
self.assertRaises(errors.KnitCorrupt,
731
self.get_knit_index, transport, 'filename', 'r')
733
if (str(e) == ('exceptions must be strings, classes, or instances,'
734
' not exceptions.ValueError')
735
and sys.version_info[0:2] >= (2,5)):
736
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
737
' raising new style exceptions with python'
742
def test_corrupted_parent_in_list(self):
743
transport = MockTransport([
747
"c option 0 1 1 v :", # Can't have a parent of 'v'
750
self.assertRaises(errors.KnitCorrupt,
751
self.get_knit_index, transport, 'filename', 'r')
753
if (str(e) == ('exceptions must be strings, classes, or instances,'
754
' not exceptions.ValueError')
755
and sys.version_info[0:2] >= (2,5)):
756
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
757
' raising new style exceptions with python'
762
def test_invalid_position(self):
763
transport = MockTransport([
768
self.assertRaises(errors.KnitCorrupt,
769
self.get_knit_index, transport, 'filename', 'r')
771
if (str(e) == ('exceptions must be strings, classes, or instances,'
772
' not exceptions.ValueError')
773
and sys.version_info[0:2] >= (2,5)):
774
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
775
' raising new style exceptions with python'
780
def test_invalid_size(self):
781
transport = MockTransport([
786
self.assertRaises(errors.KnitCorrupt,
787
self.get_knit_index, transport, 'filename', 'r')
789
if (str(e) == ('exceptions must be strings, classes, or instances,'
790
' not exceptions.ValueError')
791
and sys.version_info[0:2] >= (2,5)):
792
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
793
' raising new style exceptions with python'
798
def test_short_line(self):
799
transport = MockTransport([
802
"b option 10 10 0", # This line isn't terminated, ignored
804
index = self.get_knit_index(transport, "filename", "r")
805
self.assertEqual(['a'], index.get_versions())
807
def test_skip_incomplete_record(self):
808
# A line with bogus data should just be skipped
809
transport = MockTransport([
812
"b option 10 10 0", # This line isn't terminated, ignored
813
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
815
index = self.get_knit_index(transport, "filename", "r")
816
self.assertEqual(['a', 'c'], index.get_versions())
818
def test_trailing_characters(self):
819
# A line with bogus data should just be skipped
820
transport = MockTransport([
823
"b option 10 10 0 :a", # This line has extra trailing characters
824
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
826
index = self.get_knit_index(transport, "filename", "r")
827
self.assertEqual(['a', 'c'], index.get_versions())
830
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
832
_test_needs_features = [CompiledKnitFeature]
834
def get_knit_index(self, *args, **kwargs):
835
orig = knit._load_data
837
knit._load_data = orig
838
self.addCleanup(reset)
839
from bzrlib._knit_load_data_c import _load_data_c
840
knit._load_data = _load_data_c
841
return _KnitIndex(*args, **kwargs)
845
class KnitTests(TestCaseWithTransport):
846
"""Class containing knit test helper routines."""
848
def make_test_knit(self, annotate=False, delay_create=False):
850
factory = KnitPlainFactory()
853
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
854
factory=factory, create=True,
855
delay_create=delay_create)
858
class BasicKnitTests(KnitTests):
860
def add_stock_one_and_one_a(self, k):
861
k.add_lines('text-1', [], split_lines(TEXT_1))
862
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
864
def test_knit_constructor(self):
865
"""Construct empty k"""
866
self.make_test_knit()
868
def test_knit_add(self):
869
"""Store one text in knit and retrieve"""
870
k = self.make_test_knit()
871
k.add_lines('text-1', [], split_lines(TEXT_1))
872
self.assertTrue(k.has_version('text-1'))
873
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
875
def test_knit_reload(self):
876
# test that the content in a reloaded knit is correct
877
k = self.make_test_knit()
878
k.add_lines('text-1', [], split_lines(TEXT_1))
880
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
881
self.assertTrue(k2.has_version('text-1'))
882
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
884
def test_knit_several(self):
885
"""Store several texts in a knit"""
886
k = self.make_test_knit()
887
k.add_lines('text-1', [], split_lines(TEXT_1))
888
k.add_lines('text-2', [], split_lines(TEXT_2))
889
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
890
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
892
def test_repeated_add(self):
893
"""Knit traps attempt to replace existing version"""
894
k = self.make_test_knit()
895
k.add_lines('text-1', [], split_lines(TEXT_1))
896
self.assertRaises(RevisionAlreadyPresent,
898
'text-1', [], split_lines(TEXT_1))
900
def test_empty(self):
901
k = self.make_test_knit(True)
902
k.add_lines('text-1', [], [])
903
self.assertEquals(k.get_lines('text-1'), [])
905
def test_incomplete(self):
906
"""Test if texts without a ending line-end can be inserted and
908
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
909
k.add_lines('text-1', [], ['a\n', 'b' ])
910
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
911
# reopening ensures maximum room for confusion
912
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
913
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
914
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
916
def test_delta(self):
917
"""Expression of knit delta as lines"""
918
k = self.make_test_knit()
919
td = list(line_delta(TEXT_1.splitlines(True),
920
TEXT_1A.splitlines(True)))
921
self.assertEqualDiff(''.join(td), delta_1_1a)
922
out = apply_line_delta(TEXT_1.splitlines(True), td)
923
self.assertEqualDiff(''.join(out), TEXT_1A)
925
def test_add_with_parents(self):
926
"""Store in knit with parents"""
927
k = self.make_test_knit()
928
self.add_stock_one_and_one_a(k)
929
self.assertEquals(k.get_parents('text-1'), [])
930
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
932
def test_ancestry(self):
933
"""Store in knit with parents"""
934
k = self.make_test_knit()
935
self.add_stock_one_and_one_a(k)
936
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
938
def test_add_delta(self):
939
"""Store in knit with parents"""
940
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
941
delta=True, create=True)
942
self.add_stock_one_and_one_a(k)
944
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
946
def test_annotate(self):
948
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
949
delta=True, create=True)
950
self.insert_and_test_small_annotate(k)
952
def insert_and_test_small_annotate(self, k):
953
"""test annotation with k works correctly."""
954
k.add_lines('text-1', [], ['a\n', 'b\n'])
955
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
957
origins = k.annotate('text-2')
958
self.assertEquals(origins[0], ('text-1', 'a\n'))
959
self.assertEquals(origins[1], ('text-2', 'c\n'))
961
def test_annotate_fulltext(self):
963
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
964
delta=False, create=True)
965
self.insert_and_test_small_annotate(k)
967
def test_annotate_merge_1(self):
968
k = self.make_test_knit(True)
969
k.add_lines('text-a1', [], ['a\n', 'b\n'])
970
k.add_lines('text-a2', [], ['d\n', 'c\n'])
971
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
972
origins = k.annotate('text-am')
973
self.assertEquals(origins[0], ('text-a2', 'd\n'))
974
self.assertEquals(origins[1], ('text-a1', 'b\n'))
976
def test_annotate_merge_2(self):
977
k = self.make_test_knit(True)
978
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
979
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
980
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
981
origins = k.annotate('text-am')
982
self.assertEquals(origins[0], ('text-a1', 'a\n'))
983
self.assertEquals(origins[1], ('text-a2', 'y\n'))
984
self.assertEquals(origins[2], ('text-a1', 'c\n'))
986
def test_annotate_merge_9(self):
987
k = self.make_test_knit(True)
988
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
989
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
990
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
991
origins = k.annotate('text-am')
992
self.assertEquals(origins[0], ('text-am', 'k\n'))
993
self.assertEquals(origins[1], ('text-a2', 'y\n'))
994
self.assertEquals(origins[2], ('text-a1', 'c\n'))
996
def test_annotate_merge_3(self):
997
k = self.make_test_knit(True)
998
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
999
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
1000
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
1001
origins = k.annotate('text-am')
1002
self.assertEquals(origins[0], ('text-am', 'k\n'))
1003
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1004
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1006
def test_annotate_merge_4(self):
1007
k = self.make_test_knit(True)
1008
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1009
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1010
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
1011
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
1012
origins = k.annotate('text-am')
1013
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1014
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1015
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1017
def test_annotate_merge_5(self):
1018
k = self.make_test_knit(True)
1019
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1020
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
1021
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
1022
k.add_lines('text-am',
1023
['text-a1', 'text-a2', 'text-a3'],
1024
['a\n', 'e\n', 'z\n'])
1025
origins = k.annotate('text-am')
1026
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1027
self.assertEquals(origins[1], ('text-a2', 'e\n'))
1028
self.assertEquals(origins[2], ('text-a3', 'z\n'))
1030
def test_annotate_file_cherry_pick(self):
1031
k = self.make_test_knit(True)
1032
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
1033
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
1034
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
1035
origins = k.annotate('text-3')
1036
self.assertEquals(origins[0], ('text-1', 'a\n'))
1037
self.assertEquals(origins[1], ('text-1', 'b\n'))
1038
self.assertEquals(origins[2], ('text-1', 'c\n'))
1040
def test_knit_join(self):
1041
"""Store in knit with parents"""
1042
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
1043
k1.add_lines('text-a', [], split_lines(TEXT_1))
1044
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1046
k1.add_lines('text-c', [], split_lines(TEXT_1))
1047
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
1049
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
1051
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
1052
count = k2.join(k1, version_ids=['text-m'])
1053
self.assertEquals(count, 5)
1054
self.assertTrue(k2.has_version('text-a'))
1055
self.assertTrue(k2.has_version('text-c'))
1057
def test_reannotate(self):
1058
k1 = KnitVersionedFile('knit1', get_transport('.'),
1059
factory=KnitAnnotateFactory(), create=True)
1061
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1063
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1065
k2 = KnitVersionedFile('test2', get_transport('.'),
1066
factory=KnitAnnotateFactory(), create=True)
1067
k2.join(k1, version_ids=['text-b'])
1070
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
1072
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
1074
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
1076
# test-c will have index 3
1077
k1.join(k2, version_ids=['text-c'])
1079
lines = k1.get_lines('text-c')
1080
self.assertEquals(lines, ['z\n', 'c\n'])
1082
origins = k1.annotate('text-c')
1083
self.assertEquals(origins[0], ('text-c', 'z\n'))
1084
self.assertEquals(origins[1], ('text-b', 'c\n'))
1086
def test_get_line_delta_texts(self):
1087
"""Make sure we can call get_texts on text with reused line deltas"""
1088
k1 = KnitVersionedFile('test1', get_transport('.'),
1089
factory=KnitPlainFactory(), create=True)
1094
parents = ['%d' % (t-1)]
1095
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
1096
k1.get_texts(('%d' % t) for t in range(3))
1098
def test_iter_lines_reads_in_order(self):
1099
t = MemoryTransport()
1100
instrumented_t = TransportLogger(t)
1101
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1102
self.assertEqual([('id.kndx',)], instrumented_t._calls)
1103
# add texts with no required ordering
1104
k1.add_lines('base', [], ['text\n'])
1105
k1.add_lines('base2', [], ['text2\n'])
1107
instrumented_t._calls = []
1108
# request a last-first iteration
1109
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
1110
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
1111
self.assertEqual(['text\n', 'text2\n'], results)
1113
def test_create_empty_annotated(self):
1114
k1 = self.make_test_knit(True)
1116
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1117
k2 = k1.create_empty('t', MemoryTransport())
1118
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
1119
self.assertEqual(k1.delta, k2.delta)
1120
# the generic test checks for empty content and file class
1122
def test_knit_format(self):
1123
# this tests that a new knit index file has the expected content
1124
# and that is writes the data we expect as records are added.
1125
knit = self.make_test_knit(True)
1126
# Now knit files are not created until we first add data to them
1127
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
1128
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1129
self.assertFileEqual(
1130
"# bzr knit index 8\n"
1132
"revid fulltext 0 84 .a_ghost :",
1134
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
1135
self.assertFileEqual(
1136
"# bzr knit index 8\n"
1137
"\nrevid fulltext 0 84 .a_ghost :"
1138
"\nrevid2 line-delta 84 82 0 :",
1140
# we should be able to load this file again
1141
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1142
self.assertEqual(['revid', 'revid2'], knit.versions())
1143
# write a short write to the file and ensure that its ignored
1144
indexfile = file('test.kndx', 'ab')
1145
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1147
# we should be able to load this file again
1148
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1149
self.assertEqual(['revid', 'revid2'], knit.versions())
1150
# and add a revision with the same id the failed write had
1151
knit.add_lines('revid3', ['revid2'], ['a\n'])
1152
# and when reading it revid3 should now appear.
1153
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1154
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1155
self.assertEqual(['revid2'], knit.get_parents('revid3'))
1157
def test_delay_create(self):
1158
"""Test that passing delay_create=True creates files late"""
1159
knit = self.make_test_knit(annotate=True, delay_create=True)
1160
self.failIfExists('test.knit')
1161
self.failIfExists('test.kndx')
1162
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1163
self.failUnlessExists('test.knit')
1164
self.assertFileEqual(
1165
"# bzr knit index 8\n"
1167
"revid fulltext 0 84 .a_ghost :",
1170
def test_create_parent_dir(self):
1171
"""create_parent_dir can create knits in nonexistant dirs"""
1172
# Has no effect if we don't set 'delay_create'
1173
trans = get_transport('.')
1174
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1175
trans, access_mode='w', factory=None,
1176
create=True, create_parent_dir=True)
1177
# Nothing should have changed yet
1178
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1179
factory=None, create=True,
1180
create_parent_dir=True,
1182
self.failIfExists('dir/test.knit')
1183
self.failIfExists('dir/test.kndx')
1184
self.failIfExists('dir')
1185
knit.add_lines('revid', [], ['a\n'])
1186
self.failUnlessExists('dir')
1187
self.failUnlessExists('dir/test.knit')
1188
self.assertFileEqual(
1189
"# bzr knit index 8\n"
1191
"revid fulltext 0 84 :",
1194
def test_create_mode_700(self):
1195
trans = get_transport('.')
1196
if not trans._can_roundtrip_unix_modebits():
1197
# Can't roundtrip, so no need to run this test
1199
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1200
factory=None, create=True,
1201
create_parent_dir=True,
1205
knit.add_lines('revid', [], ['a\n'])
1206
self.assertTransportMode(trans, 'dir', 0700)
1207
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1208
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
1210
def test_create_mode_770(self):
1211
trans = get_transport('.')
1212
if not trans._can_roundtrip_unix_modebits():
1213
# Can't roundtrip, so no need to run this test
1215
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1216
factory=None, create=True,
1217
create_parent_dir=True,
1221
knit.add_lines('revid', [], ['a\n'])
1222
self.assertTransportMode(trans, 'dir', 0770)
1223
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1224
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
1226
def test_create_mode_777(self):
1227
trans = get_transport('.')
1228
if not trans._can_roundtrip_unix_modebits():
1229
# Can't roundtrip, so no need to run this test
1231
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1232
factory=None, create=True,
1233
create_parent_dir=True,
1237
knit.add_lines('revid', [], ['a\n'])
1238
self.assertTransportMode(trans, 'dir', 0777)
1239
self.assertTransportMode(trans, 'dir/test.knit', 0666)
1240
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
1242
def test_plan_merge(self):
1243
my_knit = self.make_test_knit(annotate=True)
1244
my_knit.add_lines('text1', [], split_lines(TEXT_1))
1245
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
1246
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
1247
plan = list(my_knit.plan_merge('text1a', 'text1b'))
1248
for plan_line, expected_line in zip(plan, AB_MERGE):
1249
self.assertEqual(plan_line, expected_line)
1261
Banana cup cake recipe
1267
- self-raising flour
1271
Banana cup cake recipe
1273
- bananas (do not use plantains!!!)
1280
Banana cup cake recipe
1283
- self-raising flour
1296
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1301
new-b|- bananas (do not use plantains!!!)
1302
unchanged|- broken tea cups
1303
new-a|- self-raising flour
1306
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1309
def line_delta(from_lines, to_lines):
1310
"""Generate line-based delta from one text to another"""
1311
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1312
for op in s.get_opcodes():
1313
if op[0] == 'equal':
1315
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1316
for i in range(op[3], op[4]):
1320
def apply_line_delta(basis_lines, delta_lines):
1321
"""Apply a line-based perfect diff
1323
basis_lines -- text to apply the patch to
1324
delta_lines -- diff instructions and content
1326
out = basis_lines[:]
1329
while i < len(delta_lines):
1331
a, b, c = map(long, l.split(','))
1333
out[offset+a:offset+b] = delta_lines[i:i+c]
1335
offset = offset + (b - a) + c
1339
class TestWeaveToKnit(KnitTests):
1341
def test_weave_to_knit_matches(self):
1342
# check that the WeaveToKnit is_compatible function
1343
# registers True for a Weave to a Knit.
1345
k = self.make_test_knit()
1346
self.failUnless(WeaveToKnit.is_compatible(w, k))
1347
self.failIf(WeaveToKnit.is_compatible(k, w))
1348
self.failIf(WeaveToKnit.is_compatible(w, w))
1349
self.failIf(WeaveToKnit.is_compatible(k, k))
1352
class TestKnitCaching(KnitTests):
1354
def create_knit(self, cache_add=False):
1355
k = self.make_test_knit(True)
1359
k.add_lines('text-1', [], split_lines(TEXT_1))
1360
k.add_lines('text-2', [], split_lines(TEXT_2))
1363
def test_no_caching(self):
1364
k = self.create_knit()
1365
# Nothing should be cached without setting 'enable_cache'
1366
self.assertEqual({}, k._data._cache)
1368
def test_cache_add_and_clear(self):
1369
k = self.create_knit(True)
1371
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1374
self.assertEqual({}, k._data._cache)
1376
def test_cache_data_read_raw(self):
1377
k = self.create_knit()
1379
# Now cache and read
1382
def read_one_raw(version):
1383
pos_map = k._get_components_positions([version])
1384
method, pos, size, next = pos_map[version]
1385
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1386
self.assertEqual(1, len(lst))
1389
val = read_one_raw('text-1')
1390
self.assertEqual({'text-1':val[1]}, k._data._cache)
1393
# After clear, new reads are not cached
1394
self.assertEqual({}, k._data._cache)
1396
val2 = read_one_raw('text-1')
1397
self.assertEqual(val, val2)
1398
self.assertEqual({}, k._data._cache)
1400
def test_cache_data_read(self):
1401
k = self.create_knit()
1403
def read_one(version):
1404
pos_map = k._get_components_positions([version])
1405
method, pos, size, next = pos_map[version]
1406
lst = list(k._data.read_records_iter([(version, pos, size)]))
1407
self.assertEqual(1, len(lst))
1410
# Now cache and read
1413
val = read_one('text-2')
1414
self.assertEqual(['text-2'], k._data._cache.keys())
1415
self.assertEqual('text-2', val[0])
1416
content, digest = k._data._parse_record('text-2',
1417
k._data._cache['text-2'])
1418
self.assertEqual(content, val[1])
1419
self.assertEqual(digest, val[2])
1422
self.assertEqual({}, k._data._cache)
1424
val2 = read_one('text-2')
1425
self.assertEqual(val, val2)
1426
self.assertEqual({}, k._data._cache)
1428
def test_cache_read(self):
1429
k = self.create_knit()
1432
text = k.get_text('text-1')
1433
self.assertEqual(TEXT_1, text)
1434
self.assertEqual(['text-1'], k._data._cache.keys())
1437
self.assertEqual({}, k._data._cache)
1439
text = k.get_text('text-1')
1440
self.assertEqual(TEXT_1, text)
1441
self.assertEqual({}, k._data._cache)
1444
class TestKnitIndex(KnitTests):
1446
def test_add_versions_dictionary_compresses(self):
1447
"""Adding versions to the index should update the lookup dict"""
1448
knit = self.make_test_knit()
1450
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1451
self.check_file_contents('test.kndx',
1452
'# bzr knit index 8\n'
1454
'a-1 fulltext 0 0 :'
1456
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1457
('a-3', ['fulltext'], 0, 0, ['a-2']),
1459
self.check_file_contents('test.kndx',
1460
'# bzr knit index 8\n'
1462
'a-1 fulltext 0 0 :\n'
1463
'a-2 fulltext 0 0 0 :\n'
1464
'a-3 fulltext 0 0 1 :'
1466
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1467
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1468
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1469
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1472
def test_add_versions_fails_clean(self):
1473
"""If add_versions fails in the middle, it restores a pristine state.
1475
Any modifications that are made to the index are reset if all versions
1478
# This cheats a little bit by passing in a generator which will
1479
# raise an exception before the processing finishes
1480
# Other possibilities would be to have an version with the wrong number
1481
# of entries, or to make the backing transport unable to write any
1484
knit = self.make_test_knit()
1486
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1488
class StopEarly(Exception):
1491
def generate_failure():
1492
"""Add some entries and then raise an exception"""
1493
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1494
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1497
# Assert the pre-condition
1498
self.assertEqual(['a-1'], idx._history)
1499
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1501
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1503
# And it shouldn't be modified
1504
self.assertEqual(['a-1'], idx._history)
1505
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1507
def test_knit_index_ignores_empty_files(self):
1508
# There was a race condition in older bzr, where a ^C at the right time
1509
# could leave an empty .kndx file, which bzr would later claim was a
1510
# corrupted file since the header was not present. In reality, the file
1511
# just wasn't created, so it should be ignored.
1512
t = get_transport('.')
1513
t.put_bytes('test.kndx', '')
1515
knit = self.make_test_knit()
1517
def test_knit_index_checks_header(self):
1518
t = get_transport('.')
1519
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1521
self.assertRaises(KnitHeaderError, self.make_test_knit)