1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
30
from bzrlib.errors import (
31
RevisionAlreadyPresent,
36
from bzrlib.index import *
37
from bzrlib.knit import (
47
from bzrlib.osutils import split_lines
48
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
49
from bzrlib.transport import TransportLogger, get_transport
50
from bzrlib.transport.memory import MemoryTransport
51
from bzrlib.weave import Weave
54
class _CompiledKnitFeature(Feature):
58
import bzrlib._knit_load_data_c
63
def feature_name(self):
64
return 'bzrlib._knit_load_data_c'
66
CompiledKnitFeature = _CompiledKnitFeature()
69
class KnitContentTests(TestCase):
71
def test_constructor(self):
72
content = KnitContent([])
75
content = KnitContent([])
76
self.assertEqual(content.text(), [])
78
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
79
self.assertEqual(content.text(), ["text1", "text2"])
81
def test_annotate(self):
82
content = KnitContent([])
83
self.assertEqual(content.annotate(), [])
85
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
86
self.assertEqual(content.annotate(),
87
[("origin1", "text1"), ("origin2", "text2")])
89
def test_annotate_iter(self):
90
content = KnitContent([])
91
it = content.annotate_iter()
92
self.assertRaises(StopIteration, it.next)
94
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
95
it = content.annotate_iter()
96
self.assertEqual(it.next(), ("origin1", "text1"))
97
self.assertEqual(it.next(), ("origin2", "text2"))
98
self.assertRaises(StopIteration, it.next)
101
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
102
copy = content.copy()
103
self.assertIsInstance(copy, KnitContent)
104
self.assertEqual(copy.annotate(),
105
[("origin1", "text1"), ("origin2", "text2")])
107
def test_line_delta(self):
108
content1 = KnitContent([("", "a"), ("", "b")])
109
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
110
self.assertEqual(content1.line_delta(content2),
111
[(1, 2, 2, [("", "a"), ("", "c")])])
113
def test_line_delta_iter(self):
114
content1 = KnitContent([("", "a"), ("", "b")])
115
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
116
it = content1.line_delta_iter(content2)
117
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
118
self.assertRaises(StopIteration, it.next)
121
class MockTransport(object):
123
def __init__(self, file_lines=None):
124
self.file_lines = file_lines
126
# We have no base directory for the MockTransport
129
def get(self, filename):
130
if self.file_lines is None:
131
raise NoSuchFile(filename)
133
return StringIO("\n".join(self.file_lines))
135
def readv(self, relpath, offsets):
136
fp = self.get(relpath)
137
for offset, size in offsets:
139
yield offset, fp.read(size)
141
def __getattr__(self, name):
142
def queue_call(*args, **kwargs):
143
self.calls.append((name, args, kwargs))
147
class LowLevelKnitDataTests(TestCase):
149
def create_gz_content(self, text):
151
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
154
return sio.getvalue()
156
def test_valid_knit_data(self):
157
sha1sum = sha.new('foo\nbar\n').hexdigest()
158
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
163
transport = MockTransport([gz_txt])
164
data = _KnitData(transport, 'filename', mode='r')
165
records = [('rev-id-1', 0, len(gz_txt))]
167
contents = data.read_records(records)
168
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
170
raw_contents = list(data.read_records_iter_raw(records))
171
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
173
def test_not_enough_lines(self):
174
sha1sum = sha.new('foo\n').hexdigest()
175
# record says 2 lines data says 1
176
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
180
transport = MockTransport([gz_txt])
181
data = _KnitData(transport, 'filename', mode='r')
182
records = [('rev-id-1', 0, len(gz_txt))]
183
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
185
# read_records_iter_raw won't detect that sort of mismatch/corruption
186
raw_contents = list(data.read_records_iter_raw(records))
187
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
189
def test_too_many_lines(self):
190
sha1sum = sha.new('foo\nbar\n').hexdigest()
191
# record says 1 lines data says 2
192
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
197
transport = MockTransport([gz_txt])
198
data = _KnitData(transport, 'filename', mode='r')
199
records = [('rev-id-1', 0, len(gz_txt))]
200
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
202
# read_records_iter_raw won't detect that sort of mismatch/corruption
203
raw_contents = list(data.read_records_iter_raw(records))
204
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
206
def test_mismatched_version_id(self):
207
sha1sum = sha.new('foo\nbar\n').hexdigest()
208
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
213
transport = MockTransport([gz_txt])
214
data = _KnitData(transport, 'filename', mode='r')
215
# We are asking for rev-id-2, but the data is rev-id-1
216
records = [('rev-id-2', 0, len(gz_txt))]
217
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
219
# read_records_iter_raw will notice if we request the wrong version.
220
self.assertRaises(errors.KnitCorrupt, list,
221
data.read_records_iter_raw(records))
223
def test_uncompressed_data(self):
224
sha1sum = sha.new('foo\nbar\n').hexdigest()
225
txt = ('version rev-id-1 2 %s\n'
230
transport = MockTransport([txt])
231
data = _KnitData(transport, 'filename', mode='r')
232
records = [('rev-id-1', 0, len(txt))]
234
# We don't have valid gzip data ==> corrupt
235
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
237
# read_records_iter_raw will notice the bad data
238
self.assertRaises(errors.KnitCorrupt, list,
239
data.read_records_iter_raw(records))
241
def test_corrupted_data(self):
242
sha1sum = sha.new('foo\nbar\n').hexdigest()
243
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
248
# Change 2 bytes in the middle to \xff
249
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
250
transport = MockTransport([gz_txt])
251
data = _KnitData(transport, 'filename', mode='r')
252
records = [('rev-id-1', 0, len(gz_txt))]
254
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
256
# read_records_iter_raw will notice if we request the wrong version.
257
self.assertRaises(errors.KnitCorrupt, list,
258
data.read_records_iter_raw(records))
261
class LowLevelKnitIndexTests(TestCase):
263
def get_knit_index(self, *args, **kwargs):
264
orig = knit._load_data
266
knit._load_data = orig
267
self.addCleanup(reset)
268
from bzrlib._knit_load_data_py import _load_data_py
269
knit._load_data = _load_data_py
270
return _KnitIndex(*args, **kwargs)
272
def test_no_such_file(self):
273
transport = MockTransport()
275
self.assertRaises(NoSuchFile, self.get_knit_index,
276
transport, "filename", "r")
277
self.assertRaises(NoSuchFile, self.get_knit_index,
278
transport, "filename", "w", create=False)
280
def test_create_file(self):
281
transport = MockTransport()
283
index = self.get_knit_index(transport, "filename", "w",
284
file_mode="wb", create=True)
286
("put_bytes_non_atomic",
287
("filename", index.HEADER), {"mode": "wb"}),
288
transport.calls.pop(0))
290
def test_delay_create_file(self):
291
transport = MockTransport()
293
index = self.get_knit_index(transport, "filename", "w",
294
create=True, file_mode="wb", create_parent_dir=True,
295
delay_create=True, dir_mode=0777)
296
self.assertEqual([], transport.calls)
298
index.add_versions([])
299
name, (filename, f), kwargs = transport.calls.pop(0)
300
self.assertEqual("put_file_non_atomic", name)
302
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
304
self.assertEqual("filename", filename)
305
self.assertEqual(index.HEADER, f.read())
307
index.add_versions([])
308
self.assertEqual(("append_bytes", ("filename", ""), {}),
309
transport.calls.pop(0))
311
def test_read_utf8_version_id(self):
312
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
313
utf8_revision_id = unicode_revision_id.encode('utf-8')
314
transport = MockTransport([
316
'%s option 0 1 :' % (utf8_revision_id,)
318
index = self.get_knit_index(transport, "filename", "r")
319
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
320
# Unicode revision_ids.
321
self.assertTrue(index.has_version(utf8_revision_id))
322
self.assertFalse(index.has_version(unicode_revision_id))
324
def test_read_utf8_parents(self):
325
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
326
utf8_revision_id = unicode_revision_id.encode('utf-8')
327
transport = MockTransport([
329
"version option 0 1 .%s :" % (utf8_revision_id,)
331
index = self.get_knit_index(transport, "filename", "r")
332
self.assertEqual([utf8_revision_id],
333
index.get_parents_with_ghosts("version"))
335
def test_read_ignore_corrupted_lines(self):
336
transport = MockTransport([
339
"corrupted options 0 1 .b .c ",
340
"version options 0 1 :"
342
index = self.get_knit_index(transport, "filename", "r")
343
self.assertEqual(1, index.num_versions())
344
self.assertTrue(index.has_version("version"))
346
def test_read_corrupted_header(self):
347
transport = MockTransport(['not a bzr knit index header\n'])
348
self.assertRaises(KnitHeaderError,
349
self.get_knit_index, transport, "filename", "r")
351
def test_read_duplicate_entries(self):
352
transport = MockTransport([
354
"parent options 0 1 :",
355
"version options1 0 1 0 :",
356
"version options2 1 2 .other :",
357
"version options3 3 4 0 .other :"
359
index = self.get_knit_index(transport, "filename", "r")
360
self.assertEqual(2, index.num_versions())
361
# check that the index used is the first one written. (Specific
362
# to KnitIndex style indices.
363
self.assertEqual("1", index._version_list_to_index(["version"]))
364
self.assertEqual((3, 4), index.get_position("version"))
365
self.assertEqual(["options3"], index.get_options("version"))
366
self.assertEqual(["parent", "other"],
367
index.get_parents_with_ghosts("version"))
369
def test_read_compressed_parents(self):
370
transport = MockTransport([
374
"c option 0 1 1 0 :",
376
index = self.get_knit_index(transport, "filename", "r")
377
self.assertEqual(["a"], index.get_parents("b"))
378
self.assertEqual(["b", "a"], index.get_parents("c"))
380
def test_write_utf8_version_id(self):
381
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
382
utf8_revision_id = unicode_revision_id.encode('utf-8')
383
transport = MockTransport([
386
index = self.get_knit_index(transport, "filename", "r")
387
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
388
self.assertEqual(("append_bytes", ("filename",
389
"\n%s option 0 1 :" % (utf8_revision_id,)),
391
transport.calls.pop(0))
393
def test_write_utf8_parents(self):
394
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
395
utf8_revision_id = unicode_revision_id.encode('utf-8')
396
transport = MockTransport([
399
index = self.get_knit_index(transport, "filename", "r")
400
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
401
self.assertEqual(("append_bytes", ("filename",
402
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
404
transport.calls.pop(0))
406
def test_get_graph(self):
407
transport = MockTransport()
408
index = self.get_knit_index(transport, "filename", "w", create=True)
409
self.assertEqual([], index.get_graph())
411
index.add_version("a", ["option"], 0, 1, ["b"])
412
self.assertEqual([("a", ["b"])], index.get_graph())
414
index.add_version("c", ["option"], 0, 1, ["d"])
415
self.assertEqual([("a", ["b"]), ("c", ["d"])],
416
sorted(index.get_graph()))
418
def test_get_ancestry(self):
419
transport = MockTransport([
422
"b option 0 1 0 .e :",
423
"c option 0 1 1 0 :",
424
"d option 0 1 2 .f :"
426
index = self.get_knit_index(transport, "filename", "r")
428
self.assertEqual([], index.get_ancestry([]))
429
self.assertEqual(["a"], index.get_ancestry(["a"]))
430
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
431
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
432
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
433
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
434
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
436
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
438
def test_get_ancestry_with_ghosts(self):
439
transport = MockTransport([
442
"b option 0 1 0 .e :",
443
"c option 0 1 0 .f .g :",
444
"d option 0 1 2 .h .j .k :"
446
index = self.get_knit_index(transport, "filename", "r")
448
self.assertEqual([], index.get_ancestry_with_ghosts([]))
449
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
450
self.assertEqual(["a", "e", "b"],
451
index.get_ancestry_with_ghosts(["b"]))
452
self.assertEqual(["a", "g", "f", "c"],
453
index.get_ancestry_with_ghosts(["c"]))
454
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
455
index.get_ancestry_with_ghosts(["d"]))
456
self.assertEqual(["a", "e", "b"],
457
index.get_ancestry_with_ghosts(["a", "b"]))
458
self.assertEqual(["a", "g", "f", "c"],
459
index.get_ancestry_with_ghosts(["a", "c"]))
461
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
462
index.get_ancestry_with_ghosts(["b", "d"]))
464
self.assertRaises(RevisionNotPresent,
465
index.get_ancestry_with_ghosts, ["e"])
467
def test_iter_parents(self):
468
transport = MockTransport()
469
index = self.get_knit_index(transport, "filename", "w", create=True)
471
index.add_version('r0', ['option'], 0, 1, [])
473
index.add_version('r1', ['option'], 0, 1, ['r0'])
475
index.add_version('r2', ['option'], 0, 1, ['r1', 'r0'])
477
# cases: each sample data individually:
478
self.assertEqual(set([('r0', ())]),
479
set(index.iter_parents(['r0'])))
480
self.assertEqual(set([('r1', ('r0', ))]),
481
set(index.iter_parents(['r1'])))
482
self.assertEqual(set([('r2', ('r1', 'r0'))]),
483
set(index.iter_parents(['r2'])))
484
# no nodes returned for a missing node
485
self.assertEqual(set(),
486
set(index.iter_parents(['missing'])))
487
# 1 node returned with missing nodes skipped
488
self.assertEqual(set([('r1', ('r0', ))]),
489
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
491
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
492
set(index.iter_parents(['r0', 'r1'])))
493
# 2 nodes returned, missing skipped
494
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
495
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
497
def test_num_versions(self):
498
transport = MockTransport([
501
index = self.get_knit_index(transport, "filename", "r")
503
self.assertEqual(0, index.num_versions())
504
self.assertEqual(0, len(index))
506
index.add_version("a", ["option"], 0, 1, [])
507
self.assertEqual(1, index.num_versions())
508
self.assertEqual(1, len(index))
510
index.add_version("a", ["option2"], 1, 2, [])
511
self.assertEqual(1, index.num_versions())
512
self.assertEqual(1, len(index))
514
index.add_version("b", ["option"], 0, 1, [])
515
self.assertEqual(2, index.num_versions())
516
self.assertEqual(2, len(index))
518
def test_get_versions(self):
519
transport = MockTransport([
522
index = self.get_knit_index(transport, "filename", "r")
524
self.assertEqual([], index.get_versions())
526
index.add_version("a", ["option"], 0, 1, [])
527
self.assertEqual(["a"], index.get_versions())
529
index.add_version("a", ["option"], 0, 1, [])
530
self.assertEqual(["a"], index.get_versions())
532
index.add_version("b", ["option"], 0, 1, [])
533
self.assertEqual(["a", "b"], index.get_versions())
535
def test_add_version(self):
536
transport = MockTransport([
539
index = self.get_knit_index(transport, "filename", "r")
541
index.add_version("a", ["option"], 0, 1, ["b"])
542
self.assertEqual(("append_bytes",
543
("filename", "\na option 0 1 .b :"),
544
{}), transport.calls.pop(0))
545
self.assertTrue(index.has_version("a"))
546
self.assertEqual(1, index.num_versions())
547
self.assertEqual((0, 1), index.get_position("a"))
548
self.assertEqual(["option"], index.get_options("a"))
549
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
551
index.add_version("a", ["opt"], 1, 2, ["c"])
552
self.assertEqual(("append_bytes",
553
("filename", "\na opt 1 2 .c :"),
554
{}), transport.calls.pop(0))
555
self.assertTrue(index.has_version("a"))
556
self.assertEqual(1, index.num_versions())
557
self.assertEqual((1, 2), index.get_position("a"))
558
self.assertEqual(["opt"], index.get_options("a"))
559
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
561
index.add_version("b", ["option"], 2, 3, ["a"])
562
self.assertEqual(("append_bytes",
563
("filename", "\nb option 2 3 0 :"),
564
{}), transport.calls.pop(0))
565
self.assertTrue(index.has_version("b"))
566
self.assertEqual(2, index.num_versions())
567
self.assertEqual((2, 3), index.get_position("b"))
568
self.assertEqual(["option"], index.get_options("b"))
569
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
571
def test_add_versions(self):
572
transport = MockTransport([
575
index = self.get_knit_index(transport, "filename", "r")
578
("a", ["option"], 0, 1, ["b"]),
579
("a", ["opt"], 1, 2, ["c"]),
580
("b", ["option"], 2, 3, ["a"])
582
self.assertEqual(("append_bytes", ("filename",
583
"\na option 0 1 .b :"
586
), {}), transport.calls.pop(0))
587
self.assertTrue(index.has_version("a"))
588
self.assertTrue(index.has_version("b"))
589
self.assertEqual(2, index.num_versions())
590
self.assertEqual((1, 2), index.get_position("a"))
591
self.assertEqual((2, 3), index.get_position("b"))
592
self.assertEqual(["opt"], index.get_options("a"))
593
self.assertEqual(["option"], index.get_options("b"))
594
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
595
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
597
def test_delay_create_and_add_versions(self):
598
transport = MockTransport()
600
index = self.get_knit_index(transport, "filename", "w",
601
create=True, file_mode="wb", create_parent_dir=True,
602
delay_create=True, dir_mode=0777)
603
self.assertEqual([], transport.calls)
606
("a", ["option"], 0, 1, ["b"]),
607
("a", ["opt"], 1, 2, ["c"]),
608
("b", ["option"], 2, 3, ["a"])
610
name, (filename, f), kwargs = transport.calls.pop(0)
611
self.assertEqual("put_file_non_atomic", name)
613
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
615
self.assertEqual("filename", filename)
618
"\na option 0 1 .b :"
620
"\nb option 2 3 0 :",
623
def test_has_version(self):
624
transport = MockTransport([
628
index = self.get_knit_index(transport, "filename", "r")
630
self.assertTrue(index.has_version("a"))
631
self.assertFalse(index.has_version("b"))
633
def test_get_position(self):
634
transport = MockTransport([
639
index = self.get_knit_index(transport, "filename", "r")
641
self.assertEqual((0, 1), index.get_position("a"))
642
self.assertEqual((1, 2), index.get_position("b"))
644
def test_get_method(self):
645
transport = MockTransport([
647
"a fulltext,unknown 0 1 :",
648
"b unknown,line-delta 1 2 :",
651
index = self.get_knit_index(transport, "filename", "r")
653
self.assertEqual("fulltext", index.get_method("a"))
654
self.assertEqual("line-delta", index.get_method("b"))
655
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
657
def test_get_options(self):
658
transport = MockTransport([
663
index = self.get_knit_index(transport, "filename", "r")
665
self.assertEqual(["opt1"], index.get_options("a"))
666
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
668
def test_get_parents(self):
669
transport = MockTransport([
672
"b option 1 2 0 .c :",
673
"c option 1 2 1 0 .e :"
675
index = self.get_knit_index(transport, "filename", "r")
677
self.assertEqual([], index.get_parents("a"))
678
self.assertEqual(["a", "c"], index.get_parents("b"))
679
self.assertEqual(["b", "a"], index.get_parents("c"))
681
def test_get_parents_with_ghosts(self):
682
transport = MockTransport([
685
"b option 1 2 0 .c :",
686
"c option 1 2 1 0 .e :"
688
index = self.get_knit_index(transport, "filename", "r")
690
self.assertEqual([], index.get_parents_with_ghosts("a"))
691
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
692
self.assertEqual(["b", "a", "e"],
693
index.get_parents_with_ghosts("c"))
695
def test_check_versions_present(self):
696
transport = MockTransport([
701
index = self.get_knit_index(transport, "filename", "r")
703
check = index.check_versions_present
709
self.assertRaises(RevisionNotPresent, check, ["c"])
710
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
712
def test_impossible_parent(self):
713
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
714
transport = MockTransport([
717
"b option 0 1 4 :" # We don't have a 4th record
720
self.assertRaises(errors.KnitCorrupt,
721
self.get_knit_index, transport, 'filename', 'r')
723
if (str(e) == ('exceptions must be strings, classes, or instances,'
724
' not exceptions.IndexError')
725
and sys.version_info[0:2] >= (2,5)):
726
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
727
' raising new style exceptions with python'
732
def test_corrupted_parent(self):
733
transport = MockTransport([
737
"c option 0 1 1v :", # Can't have a parent of '1v'
740
self.assertRaises(errors.KnitCorrupt,
741
self.get_knit_index, transport, 'filename', 'r')
743
if (str(e) == ('exceptions must be strings, classes, or instances,'
744
' not exceptions.ValueError')
745
and sys.version_info[0:2] >= (2,5)):
746
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
747
' raising new style exceptions with python'
752
def test_corrupted_parent_in_list(self):
753
transport = MockTransport([
757
"c option 0 1 1 v :", # Can't have a parent of 'v'
760
self.assertRaises(errors.KnitCorrupt,
761
self.get_knit_index, transport, 'filename', 'r')
763
if (str(e) == ('exceptions must be strings, classes, or instances,'
764
' not exceptions.ValueError')
765
and sys.version_info[0:2] >= (2,5)):
766
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
767
' raising new style exceptions with python'
772
def test_invalid_position(self):
773
transport = MockTransport([
778
self.assertRaises(errors.KnitCorrupt,
779
self.get_knit_index, transport, 'filename', 'r')
781
if (str(e) == ('exceptions must be strings, classes, or instances,'
782
' not exceptions.ValueError')
783
and sys.version_info[0:2] >= (2,5)):
784
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
785
' raising new style exceptions with python'
790
def test_invalid_size(self):
791
transport = MockTransport([
796
self.assertRaises(errors.KnitCorrupt,
797
self.get_knit_index, transport, 'filename', 'r')
799
if (str(e) == ('exceptions must be strings, classes, or instances,'
800
' not exceptions.ValueError')
801
and sys.version_info[0:2] >= (2,5)):
802
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
803
' raising new style exceptions with python'
808
def test_short_line(self):
809
transport = MockTransport([
812
"b option 10 10 0", # This line isn't terminated, ignored
814
index = self.get_knit_index(transport, "filename", "r")
815
self.assertEqual(['a'], index.get_versions())
817
def test_skip_incomplete_record(self):
818
# A line with bogus data should just be skipped
819
transport = MockTransport([
822
"b option 10 10 0", # This line isn't terminated, ignored
823
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
825
index = self.get_knit_index(transport, "filename", "r")
826
self.assertEqual(['a', 'c'], index.get_versions())
828
def test_trailing_characters(self):
829
# A line with bogus data should just be skipped
830
transport = MockTransport([
833
"b option 10 10 0 :a", # This line has extra trailing characters
834
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
836
index = self.get_knit_index(transport, "filename", "r")
837
self.assertEqual(['a', 'c'], index.get_versions())
840
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
842
_test_needs_features = [CompiledKnitFeature]
844
def get_knit_index(self, *args, **kwargs):
845
orig = knit._load_data
847
knit._load_data = orig
848
self.addCleanup(reset)
849
from bzrlib._knit_load_data_c import _load_data_c
850
knit._load_data = _load_data_c
851
return _KnitIndex(*args, **kwargs)
855
class KnitTests(TestCaseWithTransport):
856
"""Class containing knit test helper routines."""
858
def make_test_knit(self, annotate=False, delay_create=False, index=None):
860
factory = KnitPlainFactory()
863
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
864
factory=factory, create=True,
865
delay_create=delay_create, index=index)
868
class BasicKnitTests(KnitTests):
870
def add_stock_one_and_one_a(self, k):
871
k.add_lines('text-1', [], split_lines(TEXT_1))
872
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
874
def test_knit_constructor(self):
875
"""Construct empty k"""
876
self.make_test_knit()
878
def test_make_explicit_index(self):
879
"""We can supply an index to use."""
880
knit = KnitVersionedFile('test', get_transport('.'),
882
self.assertEqual(knit._index, 'strangelove')
884
def test_knit_add(self):
885
"""Store one text in knit and retrieve"""
886
k = self.make_test_knit()
887
k.add_lines('text-1', [], split_lines(TEXT_1))
888
self.assertTrue(k.has_version('text-1'))
889
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
891
def test_knit_reload(self):
892
# test that the content in a reloaded knit is correct
893
k = self.make_test_knit()
894
k.add_lines('text-1', [], split_lines(TEXT_1))
896
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
897
self.assertTrue(k2.has_version('text-1'))
898
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
900
def test_knit_several(self):
901
"""Store several texts in a knit"""
902
k = self.make_test_knit()
903
k.add_lines('text-1', [], split_lines(TEXT_1))
904
k.add_lines('text-2', [], split_lines(TEXT_2))
905
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
906
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
908
def test_repeated_add(self):
909
"""Knit traps attempt to replace existing version"""
910
k = self.make_test_knit()
911
k.add_lines('text-1', [], split_lines(TEXT_1))
912
self.assertRaises(RevisionAlreadyPresent,
914
'text-1', [], split_lines(TEXT_1))
916
def test_empty(self):
917
k = self.make_test_knit(True)
918
k.add_lines('text-1', [], [])
919
self.assertEquals(k.get_lines('text-1'), [])
921
def test_incomplete(self):
922
"""Test if texts without a ending line-end can be inserted and
924
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
925
k.add_lines('text-1', [], ['a\n', 'b' ])
926
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
927
# reopening ensures maximum room for confusion
928
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
929
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
930
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
932
def test_delta(self):
933
"""Expression of knit delta as lines"""
934
k = self.make_test_knit()
935
td = list(line_delta(TEXT_1.splitlines(True),
936
TEXT_1A.splitlines(True)))
937
self.assertEqualDiff(''.join(td), delta_1_1a)
938
out = apply_line_delta(TEXT_1.splitlines(True), td)
939
self.assertEqualDiff(''.join(out), TEXT_1A)
941
def test_add_with_parents(self):
942
"""Store in knit with parents"""
943
k = self.make_test_knit()
944
self.add_stock_one_and_one_a(k)
945
self.assertEquals(k.get_parents('text-1'), [])
946
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
948
def test_ancestry(self):
949
"""Store in knit with parents"""
950
k = self.make_test_knit()
951
self.add_stock_one_and_one_a(k)
952
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
954
def test_add_delta(self):
955
"""Store in knit with parents"""
956
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
957
delta=True, create=True)
958
self.add_stock_one_and_one_a(k)
960
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
962
def test_add_delta_knit_graph_index(self):
963
"""Does adding work with a KnitGraphIndex."""
964
index = InMemoryGraphIndex(2)
965
knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
967
k = KnitVersionedFile('test', get_transport('.'),
968
delta=True, create=True, index=knit_index)
969
self.add_stock_one_and_one_a(k)
971
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
972
# check the index had the right data added.
973
self.assertEqual(set([
974
('text-1', ' 0 127', ((), ())),
975
('text-1a', ' 127 140', (('text-1',), ('text-1',))),
976
]), set(index.iter_all_entries()))
977
# we should not have a .kndx file
978
self.assertFalse(get_transport('.').has('test.kndx'))
980
def test_annotate(self):
982
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
983
delta=True, create=True)
984
self.insert_and_test_small_annotate(k)
986
def insert_and_test_small_annotate(self, k):
987
"""test annotation with k works correctly."""
988
k.add_lines('text-1', [], ['a\n', 'b\n'])
989
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
991
origins = k.annotate('text-2')
992
self.assertEquals(origins[0], ('text-1', 'a\n'))
993
self.assertEquals(origins[1], ('text-2', 'c\n'))
995
def test_annotate_fulltext(self):
997
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
998
delta=False, create=True)
999
self.insert_and_test_small_annotate(k)
1001
def test_annotate_merge_1(self):
1002
k = self.make_test_knit(True)
1003
k.add_lines('text-a1', [], ['a\n', 'b\n'])
1004
k.add_lines('text-a2', [], ['d\n', 'c\n'])
1005
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
1006
origins = k.annotate('text-am')
1007
self.assertEquals(origins[0], ('text-a2', 'd\n'))
1008
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1010
def test_annotate_merge_2(self):
1011
k = self.make_test_knit(True)
1012
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1013
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1014
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
1015
origins = k.annotate('text-am')
1016
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1017
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1018
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1020
def test_annotate_merge_9(self):
1021
k = self.make_test_knit(True)
1022
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1023
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1024
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
1025
origins = k.annotate('text-am')
1026
self.assertEquals(origins[0], ('text-am', 'k\n'))
1027
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1028
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1030
def test_annotate_merge_3(self):
1031
k = self.make_test_knit(True)
1032
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1033
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
1034
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
1035
origins = k.annotate('text-am')
1036
self.assertEquals(origins[0], ('text-am', 'k\n'))
1037
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1038
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1040
def test_annotate_merge_4(self):
1041
k = self.make_test_knit(True)
1042
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1043
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1044
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
1045
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
1046
origins = k.annotate('text-am')
1047
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1048
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1049
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1051
def test_annotate_merge_5(self):
1052
k = self.make_test_knit(True)
1053
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1054
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
1055
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
1056
k.add_lines('text-am',
1057
['text-a1', 'text-a2', 'text-a3'],
1058
['a\n', 'e\n', 'z\n'])
1059
origins = k.annotate('text-am')
1060
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1061
self.assertEquals(origins[1], ('text-a2', 'e\n'))
1062
self.assertEquals(origins[2], ('text-a3', 'z\n'))
1064
def test_annotate_file_cherry_pick(self):
1065
k = self.make_test_knit(True)
1066
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
1067
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
1068
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
1069
origins = k.annotate('text-3')
1070
self.assertEquals(origins[0], ('text-1', 'a\n'))
1071
self.assertEquals(origins[1], ('text-1', 'b\n'))
1072
self.assertEquals(origins[2], ('text-1', 'c\n'))
1074
def test_knit_join(self):
1075
"""Store in knit with parents"""
1076
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
1077
k1.add_lines('text-a', [], split_lines(TEXT_1))
1078
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1080
k1.add_lines('text-c', [], split_lines(TEXT_1))
1081
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
1083
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
1085
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
1086
count = k2.join(k1, version_ids=['text-m'])
1087
self.assertEquals(count, 5)
1088
self.assertTrue(k2.has_version('text-a'))
1089
self.assertTrue(k2.has_version('text-c'))
1091
def test_reannotate(self):
1092
k1 = KnitVersionedFile('knit1', get_transport('.'),
1093
factory=KnitAnnotateFactory(), create=True)
1095
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1097
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1099
k2 = KnitVersionedFile('test2', get_transport('.'),
1100
factory=KnitAnnotateFactory(), create=True)
1101
k2.join(k1, version_ids=['text-b'])
1104
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
1106
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
1108
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
1110
# test-c will have index 3
1111
k1.join(k2, version_ids=['text-c'])
1113
lines = k1.get_lines('text-c')
1114
self.assertEquals(lines, ['z\n', 'c\n'])
1116
origins = k1.annotate('text-c')
1117
self.assertEquals(origins[0], ('text-c', 'z\n'))
1118
self.assertEquals(origins[1], ('text-b', 'c\n'))
1120
def test_get_line_delta_texts(self):
1121
"""Make sure we can call get_texts on text with reused line deltas"""
1122
k1 = KnitVersionedFile('test1', get_transport('.'),
1123
factory=KnitPlainFactory(), create=True)
1128
parents = ['%d' % (t-1)]
1129
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
1130
k1.get_texts(('%d' % t) for t in range(3))
1132
def test_iter_lines_reads_in_order(self):
1133
t = MemoryTransport()
1134
instrumented_t = TransportLogger(t)
1135
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1136
self.assertEqual([('id.kndx',)], instrumented_t._calls)
1137
# add texts with no required ordering
1138
k1.add_lines('base', [], ['text\n'])
1139
k1.add_lines('base2', [], ['text2\n'])
1141
instrumented_t._calls = []
1142
# request a last-first iteration
1143
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
1144
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
1145
self.assertEqual(['text\n', 'text2\n'], results)
1147
def test_create_empty_annotated(self):
1148
k1 = self.make_test_knit(True)
1150
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1151
k2 = k1.create_empty('t', MemoryTransport())
1152
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
1153
self.assertEqual(k1.delta, k2.delta)
1154
# the generic test checks for empty content and file class
1156
def test_knit_format(self):
1157
# this tests that a new knit index file has the expected content
1158
# and that is writes the data we expect as records are added.
1159
knit = self.make_test_knit(True)
1160
# Now knit files are not created until we first add data to them
1161
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
1162
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1163
self.assertFileEqual(
1164
"# bzr knit index 8\n"
1166
"revid fulltext 0 84 .a_ghost :",
1168
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
1169
self.assertFileEqual(
1170
"# bzr knit index 8\n"
1171
"\nrevid fulltext 0 84 .a_ghost :"
1172
"\nrevid2 line-delta 84 82 0 :",
1174
# we should be able to load this file again
1175
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1176
self.assertEqual(['revid', 'revid2'], knit.versions())
1177
# write a short write to the file and ensure that its ignored
1178
indexfile = file('test.kndx', 'ab')
1179
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1181
# we should be able to load this file again
1182
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1183
self.assertEqual(['revid', 'revid2'], knit.versions())
1184
# and add a revision with the same id the failed write had
1185
knit.add_lines('revid3', ['revid2'], ['a\n'])
1186
# and when reading it revid3 should now appear.
1187
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1188
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1189
self.assertEqual(['revid2'], knit.get_parents('revid3'))
1191
def test_delay_create(self):
1192
"""Test that passing delay_create=True creates files late"""
1193
knit = self.make_test_knit(annotate=True, delay_create=True)
1194
self.failIfExists('test.knit')
1195
self.failIfExists('test.kndx')
1196
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1197
self.failUnlessExists('test.knit')
1198
self.assertFileEqual(
1199
"# bzr knit index 8\n"
1201
"revid fulltext 0 84 .a_ghost :",
1204
def test_create_parent_dir(self):
1205
"""create_parent_dir can create knits in nonexistant dirs"""
1206
# Has no effect if we don't set 'delay_create'
1207
trans = get_transport('.')
1208
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1209
trans, access_mode='w', factory=None,
1210
create=True, create_parent_dir=True)
1211
# Nothing should have changed yet
1212
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1213
factory=None, create=True,
1214
create_parent_dir=True,
1216
self.failIfExists('dir/test.knit')
1217
self.failIfExists('dir/test.kndx')
1218
self.failIfExists('dir')
1219
knit.add_lines('revid', [], ['a\n'])
1220
self.failUnlessExists('dir')
1221
self.failUnlessExists('dir/test.knit')
1222
self.assertFileEqual(
1223
"# bzr knit index 8\n"
1225
"revid fulltext 0 84 :",
1228
def test_create_mode_700(self):
1229
trans = get_transport('.')
1230
if not trans._can_roundtrip_unix_modebits():
1231
# Can't roundtrip, so no need to run this test
1233
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1234
factory=None, create=True,
1235
create_parent_dir=True,
1239
knit.add_lines('revid', [], ['a\n'])
1240
self.assertTransportMode(trans, 'dir', 0700)
1241
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1242
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
1244
def test_create_mode_770(self):
1245
trans = get_transport('.')
1246
if not trans._can_roundtrip_unix_modebits():
1247
# Can't roundtrip, so no need to run this test
1249
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1250
factory=None, create=True,
1251
create_parent_dir=True,
1255
knit.add_lines('revid', [], ['a\n'])
1256
self.assertTransportMode(trans, 'dir', 0770)
1257
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1258
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
1260
def test_create_mode_777(self):
1261
trans = get_transport('.')
1262
if not trans._can_roundtrip_unix_modebits():
1263
# Can't roundtrip, so no need to run this test
1265
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1266
factory=None, create=True,
1267
create_parent_dir=True,
1271
knit.add_lines('revid', [], ['a\n'])
1272
self.assertTransportMode(trans, 'dir', 0777)
1273
self.assertTransportMode(trans, 'dir/test.knit', 0666)
1274
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
1276
def test_plan_merge(self):
1277
my_knit = self.make_test_knit(annotate=True)
1278
my_knit.add_lines('text1', [], split_lines(TEXT_1))
1279
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
1280
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
1281
plan = list(my_knit.plan_merge('text1a', 'text1b'))
1282
for plan_line, expected_line in zip(plan, AB_MERGE):
1283
self.assertEqual(plan_line, expected_line)
1295
Banana cup cake recipe
1301
- self-raising flour
1305
Banana cup cake recipe
1307
- bananas (do not use plantains!!!)
1314
Banana cup cake recipe
1317
- self-raising flour
1330
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1335
new-b|- bananas (do not use plantains!!!)
1336
unchanged|- broken tea cups
1337
new-a|- self-raising flour
1340
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1343
def line_delta(from_lines, to_lines):
1344
"""Generate line-based delta from one text to another"""
1345
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1346
for op in s.get_opcodes():
1347
if op[0] == 'equal':
1349
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1350
for i in range(op[3], op[4]):
1354
def apply_line_delta(basis_lines, delta_lines):
1355
"""Apply a line-based perfect diff
1357
basis_lines -- text to apply the patch to
1358
delta_lines -- diff instructions and content
1360
out = basis_lines[:]
1363
while i < len(delta_lines):
1365
a, b, c = map(long, l.split(','))
1367
out[offset+a:offset+b] = delta_lines[i:i+c]
1369
offset = offset + (b - a) + c
1373
class TestWeaveToKnit(KnitTests):
1375
def test_weave_to_knit_matches(self):
1376
# check that the WeaveToKnit is_compatible function
1377
# registers True for a Weave to a Knit.
1379
k = self.make_test_knit()
1380
self.failUnless(WeaveToKnit.is_compatible(w, k))
1381
self.failIf(WeaveToKnit.is_compatible(k, w))
1382
self.failIf(WeaveToKnit.is_compatible(w, w))
1383
self.failIf(WeaveToKnit.is_compatible(k, k))
1386
class TestKnitCaching(KnitTests):
1388
def create_knit(self, cache_add=False):
1389
k = self.make_test_knit(True)
1393
k.add_lines('text-1', [], split_lines(TEXT_1))
1394
k.add_lines('text-2', [], split_lines(TEXT_2))
1397
def test_no_caching(self):
1398
k = self.create_knit()
1399
# Nothing should be cached without setting 'enable_cache'
1400
self.assertEqual({}, k._data._cache)
1402
def test_cache_add_and_clear(self):
1403
k = self.create_knit(True)
1405
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1408
self.assertEqual({}, k._data._cache)
1410
def test_cache_data_read_raw(self):
1411
k = self.create_knit()
1413
# Now cache and read
1416
def read_one_raw(version):
1417
pos_map = k._get_components_positions([version])
1418
method, pos, size, next = pos_map[version]
1419
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1420
self.assertEqual(1, len(lst))
1423
val = read_one_raw('text-1')
1424
self.assertEqual({'text-1':val[1]}, k._data._cache)
1427
# After clear, new reads are not cached
1428
self.assertEqual({}, k._data._cache)
1430
val2 = read_one_raw('text-1')
1431
self.assertEqual(val, val2)
1432
self.assertEqual({}, k._data._cache)
1434
def test_cache_data_read(self):
1435
k = self.create_knit()
1437
def read_one(version):
1438
pos_map = k._get_components_positions([version])
1439
method, pos, size, next = pos_map[version]
1440
lst = list(k._data.read_records_iter([(version, pos, size)]))
1441
self.assertEqual(1, len(lst))
1444
# Now cache and read
1447
val = read_one('text-2')
1448
self.assertEqual(['text-2'], k._data._cache.keys())
1449
self.assertEqual('text-2', val[0])
1450
content, digest = k._data._parse_record('text-2',
1451
k._data._cache['text-2'])
1452
self.assertEqual(content, val[1])
1453
self.assertEqual(digest, val[2])
1456
self.assertEqual({}, k._data._cache)
1458
val2 = read_one('text-2')
1459
self.assertEqual(val, val2)
1460
self.assertEqual({}, k._data._cache)
1462
def test_cache_read(self):
1463
k = self.create_knit()
1466
text = k.get_text('text-1')
1467
self.assertEqual(TEXT_1, text)
1468
self.assertEqual(['text-1'], k._data._cache.keys())
1471
self.assertEqual({}, k._data._cache)
1473
text = k.get_text('text-1')
1474
self.assertEqual(TEXT_1, text)
1475
self.assertEqual({}, k._data._cache)
1478
class TestKnitIndex(KnitTests):
1480
def test_add_versions_dictionary_compresses(self):
1481
"""Adding versions to the index should update the lookup dict"""
1482
knit = self.make_test_knit()
1484
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1485
self.check_file_contents('test.kndx',
1486
'# bzr knit index 8\n'
1488
'a-1 fulltext 0 0 :'
1490
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1491
('a-3', ['fulltext'], 0, 0, ['a-2']),
1493
self.check_file_contents('test.kndx',
1494
'# bzr knit index 8\n'
1496
'a-1 fulltext 0 0 :\n'
1497
'a-2 fulltext 0 0 0 :\n'
1498
'a-3 fulltext 0 0 1 :'
1500
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1501
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1502
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1503
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1506
def test_add_versions_fails_clean(self):
1507
"""If add_versions fails in the middle, it restores a pristine state.
1509
Any modifications that are made to the index are reset if all versions
1512
# This cheats a little bit by passing in a generator which will
1513
# raise an exception before the processing finishes
1514
# Other possibilities would be to have an version with the wrong number
1515
# of entries, or to make the backing transport unable to write any
1518
knit = self.make_test_knit()
1520
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1522
class StopEarly(Exception):
1525
def generate_failure():
1526
"""Add some entries and then raise an exception"""
1527
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1528
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1531
# Assert the pre-condition
1532
self.assertEqual(['a-1'], idx._history)
1533
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1535
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1537
# And it shouldn't be modified
1538
self.assertEqual(['a-1'], idx._history)
1539
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1541
def test_knit_index_ignores_empty_files(self):
1542
# There was a race condition in older bzr, where a ^C at the right time
1543
# could leave an empty .kndx file, which bzr would later claim was a
1544
# corrupted file since the header was not present. In reality, the file
1545
# just wasn't created, so it should be ignored.
1546
t = get_transport('.')
1547
t.put_bytes('test.kndx', '')
1549
knit = self.make_test_knit()
1551
def test_knit_index_checks_header(self):
1552
t = get_transport('.')
1553
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1555
self.assertRaises(KnitHeaderError, self.make_test_knit)
1558
class TestGraphIndexKnit(KnitTests):
1559
"""Tests for knits using a GraphIndex rather than a KnitIndex."""
1561
def make_g_index(self, name, ref_lists=0, nodes=[]):
1562
builder = GraphIndexBuilder(ref_lists)
1563
for node, references, value in nodes:
1564
builder.add_node(node, references, value)
1565
stream = builder.finish()
1566
trans = self.get_transport()
1567
trans.put_file(name, stream)
1568
return GraphIndex(trans, name)
1570
def two_graph_index(self, deltas=False, catch_adds=False):
1571
"""Build a two-graph index.
1573
:param deltas: If true, use underlying indices with two node-ref
1574
lists and 'parent' set to a delta-compressed against tail.
1576
# build a complex graph across several indices.
1578
index1 = self.make_g_index('1', 2, [
1579
('tip', 'N0 100', (['parent'], [], )),
1580
('tail', '', ([], []))])
1581
index2 = self.make_g_index('2', 2, [
1582
('parent', ' 100 78', (['tail', 'ghost'], ['tail'])),
1583
('separate', '', ([], []))])
1585
index1 = self.make_g_index('1', 1, [
1586
('tip', 'N0 100', (['parent'], )),
1587
('tail', '', ([], ))])
1588
index2 = self.make_g_index('2', 1, [
1589
('parent', ' 100 78', (['tail', 'ghost'], )),
1590
('separate', '', ([], ))])
1591
combined_index = CombinedGraphIndex([index1, index2])
1593
self.combined_index = combined_index
1594
self.caught_entries = []
1595
add_callback = self.catch_add
1598
return KnitGraphIndex(combined_index, deltas=deltas,
1599
add_callback=add_callback)
1601
def test_get_graph(self):
1602
index = self.two_graph_index()
1603
self.assertEqual(set([
1604
('tip', ('parent', )),
1606
('parent', ('tail', 'ghost')),
1608
]), set(index.get_graph()))
1610
def test_get_ancestry(self):
1611
# get_ancestry is defined as eliding ghosts, not erroring.
1612
index = self.two_graph_index()
1613
self.assertEqual([], index.get_ancestry([]))
1614
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1615
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1616
self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
1617
self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
1618
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1619
(['tail', 'parent', 'tip', 'separate'],
1620
['separate', 'tail', 'parent', 'tip'],
1622
# and without topo_sort
1623
self.assertEqual(set(['separate']),
1624
set(index.get_ancestry(['separate'], topo_sorted=False)))
1625
self.assertEqual(set(['tail']),
1626
set(index.get_ancestry(['tail'], topo_sorted=False)))
1627
self.assertEqual(set(['tail', 'parent']),
1628
set(index.get_ancestry(['parent'], topo_sorted=False)))
1629
self.assertEqual(set(['tail', 'parent', 'tip']),
1630
set(index.get_ancestry(['tip'], topo_sorted=False)))
1631
self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
1632
set(index.get_ancestry(['tip', 'separate'])))
1633
# asking for a ghost makes it go boom.
1634
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1636
def test_get_ancestry_with_ghosts(self):
1637
index = self.two_graph_index()
1638
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1639
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1640
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1641
self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
1642
(['tail', 'ghost', 'parent'],
1643
['ghost', 'tail', 'parent'],
1645
self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
1646
(['tail', 'ghost', 'parent', 'tip'],
1647
['ghost', 'tail', 'parent', 'tip'],
1649
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1650
(['tail', 'ghost', 'parent', 'tip', 'separate'],
1651
['ghost', 'tail', 'parent', 'tip', 'separate'],
1652
['separate', 'tail', 'ghost', 'parent', 'tip'],
1653
['separate', 'ghost', 'tail', 'parent', 'tip'],
1655
# asking for a ghost makes it go boom.
1656
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1658
def test_num_versions(self):
1659
index = self.two_graph_index()
1660
self.assertEqual(4, index.num_versions())
1662
def test_get_versions(self):
1663
index = self.two_graph_index()
1664
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1665
set(index.get_versions()))
1667
def test_has_version(self):
1668
index = self.two_graph_index()
1669
self.assertTrue(index.has_version('tail'))
1670
self.assertFalse(index.has_version('ghost'))
1672
def test_get_position(self):
1673
index = self.two_graph_index()
1674
self.assertEqual((0, 100), index.get_position('tip'))
1675
self.assertEqual((100, 78), index.get_position('parent'))
1677
def test_get_method_deltas(self):
1678
index = self.two_graph_index(deltas=True)
1679
self.assertEqual('fulltext', index.get_method('tip'))
1680
self.assertEqual('line-delta', index.get_method('parent'))
1682
def test_get_method_no_deltas(self):
1683
# check that the parent-history lookup is ignored with deltas=False.
1684
index = self.two_graph_index(deltas=False)
1685
self.assertEqual('fulltext', index.get_method('tip'))
1686
self.assertEqual('fulltext', index.get_method('parent'))
1688
def test_get_options_deltas(self):
1689
index = self.two_graph_index(deltas=True)
1690
self.assertEqual('fulltext,no-eol', index.get_options('tip'))
1691
self.assertEqual('line-delta', index.get_options('parent'))
1693
def test_get_options_no_deltas(self):
1694
# check that the parent-history lookup is ignored with deltas=False.
1695
index = self.two_graph_index(deltas=False)
1696
self.assertEqual('fulltext,no-eol', index.get_options('tip'))
1697
self.assertEqual('fulltext', index.get_options('parent'))
1699
def test_get_parents(self):
1700
# get_parents ignores ghosts
1701
index = self.two_graph_index()
1702
self.assertEqual(('tail', ), index.get_parents('parent'))
1703
# and errors on ghosts.
1704
self.assertRaises(errors.RevisionNotPresent,
1705
index.get_parents, 'ghost')
1707
def test_get_parents_with_ghosts(self):
1708
index = self.two_graph_index()
1709
self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
1710
# and errors on ghosts.
1711
self.assertRaises(errors.RevisionNotPresent,
1712
index.get_parents_with_ghosts, 'ghost')
1714
def test_check_versions_present(self):
1715
# ghosts should not be considered present
1716
index = self.two_graph_index()
1717
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1719
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1721
index.check_versions_present(['tail', 'separate'])
1723
def catch_add(self, entries):
1724
self.caught_entries.append(entries)
1726
def test_add_no_callback_errors(self):
1727
index = self.two_graph_index()
1728
self.assertRaises(errors.ReadOnlyError, index.add_version,
1729
'new', 'fulltext,no-eol', 50, 60, ['separate'])
1731
def test_add_version_smoke(self):
1732
index = self.two_graph_index(catch_adds=True)
1733
index.add_version('new', 'fulltext,no-eol', 50, 60, ['separate'])
1734
self.assertEqual([[('new', 'N50 60', (('separate',),))]],
1735
self.caught_entries)
1737
def test_add_version_delta_not_delta_index(self):
1738
index = self.two_graph_index(catch_adds=True)
1739
self.assertRaises(errors.KnitCorrupt, index.add_version,
1740
'new', 'no-eol,line-delta', 0, 100, ['parent'])
1741
self.assertEqual([], self.caught_entries)
1743
def test_add_version_same_dup(self):
1744
index = self.two_graph_index(catch_adds=True)
1745
# options can be spelt two different ways
1746
index.add_version('tip', 'fulltext,no-eol', 0, 100, ['parent'])
1747
index.add_version('tip', 'no-eol,fulltext', 0, 100, ['parent'])
1748
# but neither should have added data.
1749
self.assertEqual([[], []], self.caught_entries)
1751
def test_add_version_different_dup(self):
1752
index = self.two_graph_index(deltas=True, catch_adds=True)
1754
self.assertRaises(errors.KnitCorrupt, index.add_version,
1755
'tip', 'no-eol,line-delta', 0, 100, ['parent'])
1756
self.assertRaises(errors.KnitCorrupt, index.add_version,
1757
'tip', 'line-delta,no-eol', 0, 100, ['parent'])
1758
self.assertRaises(errors.KnitCorrupt, index.add_version,
1759
'tip', 'fulltext', 0, 100, ['parent'])
1761
self.assertRaises(errors.KnitCorrupt, index.add_version,
1762
'tip', 'fulltext,no-eol', 50, 100, ['parent'])
1763
self.assertRaises(errors.KnitCorrupt, index.add_version,
1764
'tip', 'fulltext,no-eol', 0, 1000, ['parent'])
1766
self.assertRaises(errors.KnitCorrupt, index.add_version,
1767
'tip', 'fulltext,no-eol', 0, 100, [])
1768
self.assertEqual([], self.caught_entries)
1770
def test_add_versions_nodeltas(self):
1771
index = self.two_graph_index(catch_adds=True)
1772
index.add_versions([
1773
('new', 'fulltext,no-eol', 50, 60, ['separate']),
1774
('new2', 'fulltext', 0, 6, ['new']),
1776
self.assertEqual([('new', 'N50 60', (('separate',),)),
1777
('new2', ' 0 6', (('new',),))],
1778
sorted(self.caught_entries[0]))
1779
self.assertEqual(1, len(self.caught_entries))
1781
def test_add_versions_deltas(self):
1782
index = self.two_graph_index(deltas=True, catch_adds=True)
1783
index.add_versions([
1784
('new', 'fulltext,no-eol', 50, 60, ['separate']),
1785
('new2', 'line-delta', 0, 6, ['new']),
1787
self.assertEqual([('new', 'N50 60', (('separate',), ())),
1788
('new2', ' 0 6', (('new',), ('new',), ))],
1789
sorted(self.caught_entries[0]))
1790
self.assertEqual(1, len(self.caught_entries))
1792
def test_add_versions_delta_not_delta_index(self):
1793
index = self.two_graph_index(catch_adds=True)
1794
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1795
[('new', 'no-eol,line-delta', 0, 100, ['parent'])])
1796
self.assertEqual([], self.caught_entries)
1798
def test_add_versions_same_dup(self):
1799
index = self.two_graph_index(catch_adds=True)
1800
# options can be spelt two different ways
1801
index.add_versions([('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
1802
index.add_versions([('tip', 'no-eol,fulltext', 0, 100, ['parent'])])
1803
# but neither should have added data.
1804
self.assertEqual([[], []], self.caught_entries)
1806
def test_add_versions_different_dup(self):
1807
index = self.two_graph_index(deltas=True, catch_adds=True)
1809
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1810
[('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
1811
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1812
[('tip', 'line-delta,no-eol', 0, 100, ['parent'])])
1813
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1814
[('tip', 'fulltext', 0, 100, ['parent'])])
1816
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1817
[('tip', 'fulltext,no-eol', 50, 100, ['parent'])])
1818
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1819
[('tip', 'fulltext,no-eol', 0, 1000, ['parent'])])
1821
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1822
[('tip', 'fulltext,no-eol', 0, 100, [])])
1823
# change options in the second record
1824
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1825
[('tip', 'fulltext,no-eol', 0, 100, ['parent']),
1826
('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
1827
self.assertEqual([], self.caught_entries)
1829
def test_iter_parents(self):
1830
index1 = self.make_g_index('1', 1, [
1832
('r0', 'N0 100', ([], )),
1834
('r1', '', (['r0'], ))])
1835
index2 = self.make_g_index('2', 1, [
1837
('r2', 'N0 100', (['r1', 'r0'], )),
1839
combined_index = CombinedGraphIndex([index1, index2])
1840
index = KnitGraphIndex(combined_index)
1842
# cases: each sample data individually:
1843
self.assertEqual(set([('r0', ())]),
1844
set(index.iter_parents(['r0'])))
1845
self.assertEqual(set([('r1', ('r0', ))]),
1846
set(index.iter_parents(['r1'])))
1847
self.assertEqual(set([('r2', ('r1', 'r0'))]),
1848
set(index.iter_parents(['r2'])))
1849
# no nodes returned for a missing node
1850
self.assertEqual(set(),
1851
set(index.iter_parents(['missing'])))
1852
# 1 node returned with missing nodes skipped
1853
self.assertEqual(set([('r1', ('r0', ))]),
1854
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
1856
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
1857
set(index.iter_parents(['r0', 'r1'])))
1858
# 2 nodes returned, missing skipped
1859
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
1860
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
1863
class TestNoParentsGraphIndexKnit(KnitTests):
1864
"""Tests for knits using KnitGraphIndex with no parents."""
1866
def make_g_index(self, name, ref_lists=0, nodes=[]):
1867
builder = GraphIndexBuilder(ref_lists)
1868
for node, references in nodes:
1869
builder.add_node(node, references)
1870
stream = builder.finish()
1871
trans = self.get_transport()
1872
trans.put_file(name, stream)
1873
return GraphIndex(trans, name)
1875
def test_parents_deltas_incompatible(self):
1876
index = CombinedGraphIndex([])
1877
self.assertRaises(errors.KnitError, KnitGraphIndex, index,
1878
deltas=True, parents=False)
1880
def two_graph_index(self, catch_adds=False):
1881
"""Build a two-graph index.
1883
:param deltas: If true, use underlying indices with two node-ref
1884
lists and 'parent' set to a delta-compressed against tail.
1886
# put several versions in the index.
1887
index1 = self.make_g_index('1', 0, [
1890
index2 = self.make_g_index('2', 0, [
1891
('parent', ' 100 78'),
1893
combined_index = CombinedGraphIndex([index1, index2])
1895
self.combined_index = combined_index
1896
self.caught_entries = []
1897
add_callback = self.catch_add
1900
return KnitGraphIndex(combined_index, parents=False,
1901
add_callback=add_callback)
1903
def test_get_graph(self):
1904
index = self.two_graph_index()
1905
self.assertEqual(set([
1910
]), set(index.get_graph()))
1912
def test_get_ancestry(self):
1913
# with no parents, ancestry is always just the key.
1914
index = self.two_graph_index()
1915
self.assertEqual([], index.get_ancestry([]))
1916
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1917
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1918
self.assertEqual(['parent'], index.get_ancestry(['parent']))
1919
self.assertEqual(['tip'], index.get_ancestry(['tip']))
1920
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1921
(['tip', 'separate'],
1922
['separate', 'tip'],
1924
# asking for a ghost makes it go boom.
1925
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1927
def test_get_ancestry_with_ghosts(self):
1928
index = self.two_graph_index()
1929
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1930
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1931
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1932
self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
1933
self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
1934
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1935
(['tip', 'separate'],
1936
['separate', 'tip'],
1938
# asking for a ghost makes it go boom.
1939
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1941
def test_num_versions(self):
1942
index = self.two_graph_index()
1943
self.assertEqual(4, index.num_versions())
1945
def test_get_versions(self):
1946
index = self.two_graph_index()
1947
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1948
set(index.get_versions()))
1950
def test_has_version(self):
1951
index = self.two_graph_index()
1952
self.assertTrue(index.has_version('tail'))
1953
self.assertFalse(index.has_version('ghost'))
1955
def test_get_position(self):
1956
index = self.two_graph_index()
1957
self.assertEqual((0, 100), index.get_position('tip'))
1958
self.assertEqual((100, 78), index.get_position('parent'))
1960
def test_get_method(self):
1961
index = self.two_graph_index()
1962
self.assertEqual('fulltext', index.get_method('tip'))
1963
self.assertEqual('fulltext', index.get_options('parent'))
1965
def test_get_options(self):
1966
index = self.two_graph_index()
1967
self.assertEqual('fulltext,no-eol', index.get_options('tip'))
1968
self.assertEqual('fulltext', index.get_options('parent'))
1970
def test_get_parents(self):
1971
index = self.two_graph_index()
1972
self.assertEqual((), index.get_parents('parent'))
1973
# and errors on ghosts.
1974
self.assertRaises(errors.RevisionNotPresent,
1975
index.get_parents, 'ghost')
1977
def test_get_parents_with_ghosts(self):
1978
index = self.two_graph_index()
1979
self.assertEqual((), index.get_parents_with_ghosts('parent'))
1980
# and errors on ghosts.
1981
self.assertRaises(errors.RevisionNotPresent,
1982
index.get_parents_with_ghosts, 'ghost')
1984
def test_check_versions_present(self):
1985
index = self.two_graph_index()
1986
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1988
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1989
['tail', 'missing'])
1990
index.check_versions_present(['tail', 'separate'])
1992
def catch_add(self, entries):
1993
self.caught_entries.append(entries)
1995
def test_add_no_callback_errors(self):
1996
index = self.two_graph_index()
1997
self.assertRaises(errors.ReadOnlyError, index.add_version,
1998
'new', 'fulltext,no-eol', 50, 60, ['separate'])
2000
def test_add_version_smoke(self):
2001
index = self.two_graph_index(catch_adds=True)
2002
index.add_version('new', 'fulltext,no-eol', 50, 60, [])
2003
self.assertEqual([[('new', 'N50 60')]],
2004
self.caught_entries)
2006
def test_add_version_delta_not_delta_index(self):
2007
index = self.two_graph_index(catch_adds=True)
2008
self.assertRaises(errors.KnitCorrupt, index.add_version,
2009
'new', 'no-eol,line-delta', 0, 100, [])
2010
self.assertEqual([], self.caught_entries)
2012
def test_add_version_same_dup(self):
2013
index = self.two_graph_index(catch_adds=True)
2014
# options can be spelt two different ways
2015
index.add_version('tip', 'fulltext,no-eol', 0, 100, [])
2016
index.add_version('tip', 'no-eol,fulltext', 0, 100, [])
2017
# but neither should have added data.
2018
self.assertEqual([[], []], self.caught_entries)
2020
def test_add_version_different_dup(self):
2021
index = self.two_graph_index(catch_adds=True)
2023
self.assertRaises(errors.KnitCorrupt, index.add_version,
2024
'tip', 'no-eol,line-delta', 0, 100, [])
2025
self.assertRaises(errors.KnitCorrupt, index.add_version,
2026
'tip', 'line-delta,no-eol', 0, 100, [])
2027
self.assertRaises(errors.KnitCorrupt, index.add_version,
2028
'tip', 'fulltext', 0, 100, [])
2030
self.assertRaises(errors.KnitCorrupt, index.add_version,
2031
'tip', 'fulltext,no-eol', 50, 100, [])
2032
self.assertRaises(errors.KnitCorrupt, index.add_version,
2033
'tip', 'fulltext,no-eol', 0, 1000, [])
2035
self.assertRaises(errors.KnitCorrupt, index.add_version,
2036
'tip', 'fulltext,no-eol', 0, 100, ['parent'])
2037
self.assertEqual([], self.caught_entries)
2039
def test_add_versions(self):
2040
index = self.two_graph_index(catch_adds=True)
2041
index.add_versions([
2042
('new', 'fulltext,no-eol', 50, 60, []),
2043
('new2', 'fulltext', 0, 6, []),
2045
self.assertEqual([('new', 'N50 60'), ('new2', ' 0 6')],
2046
sorted(self.caught_entries[0]))
2047
self.assertEqual(1, len(self.caught_entries))
2049
def test_add_versions_delta_not_delta_index(self):
2050
index = self.two_graph_index(catch_adds=True)
2051
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2052
[('new', 'no-eol,line-delta', 0, 100, ['parent'])])
2053
self.assertEqual([], self.caught_entries)
2055
def test_add_versions_parents_not_parents_index(self):
2056
index = self.two_graph_index(catch_adds=True)
2057
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2058
[('new', 'no-eol,fulltext', 0, 100, ['parent'])])
2059
self.assertEqual([], self.caught_entries)
2061
def test_add_versions_same_dup(self):
2062
index = self.two_graph_index(catch_adds=True)
2063
# options can be spelt two different ways
2064
index.add_versions([('tip', 'fulltext,no-eol', 0, 100, [])])
2065
index.add_versions([('tip', 'no-eol,fulltext', 0, 100, [])])
2066
# but neither should have added data.
2067
self.assertEqual([[], []], self.caught_entries)
2069
def test_add_versions_different_dup(self):
2070
index = self.two_graph_index(catch_adds=True)
2072
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2073
[('tip', 'no-eol,line-delta', 0, 100, [])])
2074
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2075
[('tip', 'line-delta,no-eol', 0, 100, [])])
2076
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2077
[('tip', 'fulltext', 0, 100, [])])
2079
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2080
[('tip', 'fulltext,no-eol', 50, 100, [])])
2081
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2082
[('tip', 'fulltext,no-eol', 0, 1000, [])])
2084
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2085
[('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
2086
# change options in the second record
2087
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2088
[('tip', 'fulltext,no-eol', 0, 100, []),
2089
('tip', 'no-eol,line-delta', 0, 100, [])])
2090
self.assertEqual([], self.caught_entries)
2092
def test_iter_parents(self):
2093
index = self.two_graph_index()
2094
self.assertEqual(set([
2095
('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
2097
set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
2098
self.assertEqual(set([('tip', ())]),
2099
set(index.iter_parents(['tip'])))
2100
self.assertEqual(set(),
2101
set(index.iter_parents([])))