1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
30
from bzrlib.errors import (
31
RevisionAlreadyPresent,
36
from bzrlib.index import *
37
from bzrlib.knit import (
48
from bzrlib.osutils import split_lines
49
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
50
from bzrlib.transport import TransportLogger, get_transport
51
from bzrlib.transport.memory import MemoryTransport
52
from bzrlib.weave import Weave
55
class _CompiledKnitFeature(Feature):
59
import bzrlib._knit_load_data_c
64
def feature_name(self):
65
return 'bzrlib._knit_load_data_c'
67
CompiledKnitFeature = _CompiledKnitFeature()
70
class KnitContentTests(TestCase):
72
def test_constructor(self):
73
content = KnitContent([])
76
content = KnitContent([])
77
self.assertEqual(content.text(), [])
79
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
80
self.assertEqual(content.text(), ["text1", "text2"])
82
def test_annotate(self):
83
content = KnitContent([])
84
self.assertEqual(content.annotate(), [])
86
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
87
self.assertEqual(content.annotate(),
88
[("origin1", "text1"), ("origin2", "text2")])
90
def test_annotate_iter(self):
91
content = KnitContent([])
92
it = content.annotate_iter()
93
self.assertRaises(StopIteration, it.next)
95
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
96
it = content.annotate_iter()
97
self.assertEqual(it.next(), ("origin1", "text1"))
98
self.assertEqual(it.next(), ("origin2", "text2"))
99
self.assertRaises(StopIteration, it.next)
102
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
103
copy = content.copy()
104
self.assertIsInstance(copy, KnitContent)
105
self.assertEqual(copy.annotate(),
106
[("origin1", "text1"), ("origin2", "text2")])
108
def test_line_delta(self):
109
content1 = KnitContent([("", "a"), ("", "b")])
110
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
111
self.assertEqual(content1.line_delta(content2),
112
[(1, 2, 2, [("", "a"), ("", "c")])])
114
def test_line_delta_iter(self):
115
content1 = KnitContent([("", "a"), ("", "b")])
116
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
117
it = content1.line_delta_iter(content2)
118
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
119
self.assertRaises(StopIteration, it.next)
122
class MockTransport(object):
124
def __init__(self, file_lines=None):
125
self.file_lines = file_lines
127
# We have no base directory for the MockTransport
130
def get(self, filename):
131
if self.file_lines is None:
132
raise NoSuchFile(filename)
134
return StringIO("\n".join(self.file_lines))
136
def readv(self, relpath, offsets):
137
fp = self.get(relpath)
138
for offset, size in offsets:
140
yield offset, fp.read(size)
142
def __getattr__(self, name):
143
def queue_call(*args, **kwargs):
144
self.calls.append((name, args, kwargs))
148
class LowLevelKnitDataTests(TestCase):
150
def create_gz_content(self, text):
152
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
155
return sio.getvalue()
157
def test_valid_knit_data(self):
158
sha1sum = sha.new('foo\nbar\n').hexdigest()
159
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
164
transport = MockTransport([gz_txt])
165
data = _KnitData(transport, 'filename', mode='r')
166
records = [('rev-id-1', 0, len(gz_txt))]
168
contents = data.read_records(records)
169
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
171
raw_contents = list(data.read_records_iter_raw(records))
172
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
174
def test_not_enough_lines(self):
175
sha1sum = sha.new('foo\n').hexdigest()
176
# record says 2 lines data says 1
177
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
181
transport = MockTransport([gz_txt])
182
data = _KnitData(transport, 'filename', mode='r')
183
records = [('rev-id-1', 0, len(gz_txt))]
184
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
186
# read_records_iter_raw won't detect that sort of mismatch/corruption
187
raw_contents = list(data.read_records_iter_raw(records))
188
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
190
def test_too_many_lines(self):
191
sha1sum = sha.new('foo\nbar\n').hexdigest()
192
# record says 1 lines data says 2
193
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
198
transport = MockTransport([gz_txt])
199
data = _KnitData(transport, 'filename', mode='r')
200
records = [('rev-id-1', 0, len(gz_txt))]
201
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
203
# read_records_iter_raw won't detect that sort of mismatch/corruption
204
raw_contents = list(data.read_records_iter_raw(records))
205
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
207
def test_mismatched_version_id(self):
208
sha1sum = sha.new('foo\nbar\n').hexdigest()
209
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
214
transport = MockTransport([gz_txt])
215
data = _KnitData(transport, 'filename', mode='r')
216
# We are asking for rev-id-2, but the data is rev-id-1
217
records = [('rev-id-2', 0, len(gz_txt))]
218
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
220
# read_records_iter_raw will notice if we request the wrong version.
221
self.assertRaises(errors.KnitCorrupt, list,
222
data.read_records_iter_raw(records))
224
def test_uncompressed_data(self):
225
sha1sum = sha.new('foo\nbar\n').hexdigest()
226
txt = ('version rev-id-1 2 %s\n'
231
transport = MockTransport([txt])
232
data = _KnitData(transport, 'filename', mode='r')
233
records = [('rev-id-1', 0, len(txt))]
235
# We don't have valid gzip data ==> corrupt
236
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
238
# read_records_iter_raw will notice the bad data
239
self.assertRaises(errors.KnitCorrupt, list,
240
data.read_records_iter_raw(records))
242
def test_corrupted_data(self):
243
sha1sum = sha.new('foo\nbar\n').hexdigest()
244
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
249
# Change 2 bytes in the middle to \xff
250
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
251
transport = MockTransport([gz_txt])
252
data = _KnitData(transport, 'filename', mode='r')
253
records = [('rev-id-1', 0, len(gz_txt))]
255
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
257
# read_records_iter_raw will notice if we request the wrong version.
258
self.assertRaises(errors.KnitCorrupt, list,
259
data.read_records_iter_raw(records))
262
class LowLevelKnitIndexTests(TestCase):
264
def get_knit_index(self, *args, **kwargs):
265
orig = knit._load_data
267
knit._load_data = orig
268
self.addCleanup(reset)
269
from bzrlib._knit_load_data_py import _load_data_py
270
knit._load_data = _load_data_py
271
return _KnitIndex(*args, **kwargs)
273
def test_no_such_file(self):
274
transport = MockTransport()
276
self.assertRaises(NoSuchFile, self.get_knit_index,
277
transport, "filename", "r")
278
self.assertRaises(NoSuchFile, self.get_knit_index,
279
transport, "filename", "w", create=False)
281
def test_create_file(self):
282
transport = MockTransport()
284
index = self.get_knit_index(transport, "filename", "w",
285
file_mode="wb", create=True)
287
("put_bytes_non_atomic",
288
("filename", index.HEADER), {"mode": "wb"}),
289
transport.calls.pop(0))
291
def test_delay_create_file(self):
292
transport = MockTransport()
294
index = self.get_knit_index(transport, "filename", "w",
295
create=True, file_mode="wb", create_parent_dir=True,
296
delay_create=True, dir_mode=0777)
297
self.assertEqual([], transport.calls)
299
index.add_versions([])
300
name, (filename, f), kwargs = transport.calls.pop(0)
301
self.assertEqual("put_file_non_atomic", name)
303
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
305
self.assertEqual("filename", filename)
306
self.assertEqual(index.HEADER, f.read())
308
index.add_versions([])
309
self.assertEqual(("append_bytes", ("filename", ""), {}),
310
transport.calls.pop(0))
312
def test_read_utf8_version_id(self):
313
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
314
utf8_revision_id = unicode_revision_id.encode('utf-8')
315
transport = MockTransport([
317
'%s option 0 1 :' % (utf8_revision_id,)
319
index = self.get_knit_index(transport, "filename", "r")
320
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
321
# Unicode revision_ids.
322
self.assertTrue(index.has_version(utf8_revision_id))
323
self.assertFalse(index.has_version(unicode_revision_id))
325
def test_read_utf8_parents(self):
326
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
327
utf8_revision_id = unicode_revision_id.encode('utf-8')
328
transport = MockTransport([
330
"version option 0 1 .%s :" % (utf8_revision_id,)
332
index = self.get_knit_index(transport, "filename", "r")
333
self.assertEqual([utf8_revision_id],
334
index.get_parents_with_ghosts("version"))
336
def test_read_ignore_corrupted_lines(self):
337
transport = MockTransport([
340
"corrupted options 0 1 .b .c ",
341
"version options 0 1 :"
343
index = self.get_knit_index(transport, "filename", "r")
344
self.assertEqual(1, index.num_versions())
345
self.assertTrue(index.has_version("version"))
347
def test_read_corrupted_header(self):
348
transport = MockTransport(['not a bzr knit index header\n'])
349
self.assertRaises(KnitHeaderError,
350
self.get_knit_index, transport, "filename", "r")
352
def test_read_duplicate_entries(self):
353
transport = MockTransport([
355
"parent options 0 1 :",
356
"version options1 0 1 0 :",
357
"version options2 1 2 .other :",
358
"version options3 3 4 0 .other :"
360
index = self.get_knit_index(transport, "filename", "r")
361
self.assertEqual(2, index.num_versions())
362
# check that the index used is the first one written. (Specific
363
# to KnitIndex style indices.
364
self.assertEqual("1", index._version_list_to_index(["version"]))
365
self.assertEqual((3, 4), index.get_position("version"))
366
self.assertEqual(["options3"], index.get_options("version"))
367
self.assertEqual(["parent", "other"],
368
index.get_parents_with_ghosts("version"))
370
def test_read_compressed_parents(self):
371
transport = MockTransport([
375
"c option 0 1 1 0 :",
377
index = self.get_knit_index(transport, "filename", "r")
378
self.assertEqual(["a"], index.get_parents("b"))
379
self.assertEqual(["b", "a"], index.get_parents("c"))
381
def test_write_utf8_version_id(self):
382
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
383
utf8_revision_id = unicode_revision_id.encode('utf-8')
384
transport = MockTransport([
387
index = self.get_knit_index(transport, "filename", "r")
388
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
389
self.assertEqual(("append_bytes", ("filename",
390
"\n%s option 0 1 :" % (utf8_revision_id,)),
392
transport.calls.pop(0))
394
def test_write_utf8_parents(self):
395
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
396
utf8_revision_id = unicode_revision_id.encode('utf-8')
397
transport = MockTransport([
400
index = self.get_knit_index(transport, "filename", "r")
401
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
402
self.assertEqual(("append_bytes", ("filename",
403
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
405
transport.calls.pop(0))
407
def test_get_graph(self):
408
transport = MockTransport()
409
index = self.get_knit_index(transport, "filename", "w", create=True)
410
self.assertEqual([], index.get_graph())
412
index.add_version("a", ["option"], 0, 1, ["b"])
413
self.assertEqual([("a", ["b"])], index.get_graph())
415
index.add_version("c", ["option"], 0, 1, ["d"])
416
self.assertEqual([("a", ["b"]), ("c", ["d"])],
417
sorted(index.get_graph()))
419
def test_get_ancestry(self):
420
transport = MockTransport([
423
"b option 0 1 0 .e :",
424
"c option 0 1 1 0 :",
425
"d option 0 1 2 .f :"
427
index = self.get_knit_index(transport, "filename", "r")
429
self.assertEqual([], index.get_ancestry([]))
430
self.assertEqual(["a"], index.get_ancestry(["a"]))
431
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
432
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
433
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
434
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
435
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
437
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
439
def test_get_ancestry_with_ghosts(self):
440
transport = MockTransport([
443
"b option 0 1 0 .e :",
444
"c option 0 1 0 .f .g :",
445
"d option 0 1 2 .h .j .k :"
447
index = self.get_knit_index(transport, "filename", "r")
449
self.assertEqual([], index.get_ancestry_with_ghosts([]))
450
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
451
self.assertEqual(["a", "e", "b"],
452
index.get_ancestry_with_ghosts(["b"]))
453
self.assertEqual(["a", "g", "f", "c"],
454
index.get_ancestry_with_ghosts(["c"]))
455
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
456
index.get_ancestry_with_ghosts(["d"]))
457
self.assertEqual(["a", "e", "b"],
458
index.get_ancestry_with_ghosts(["a", "b"]))
459
self.assertEqual(["a", "g", "f", "c"],
460
index.get_ancestry_with_ghosts(["a", "c"]))
462
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
463
index.get_ancestry_with_ghosts(["b", "d"]))
465
self.assertRaises(RevisionNotPresent,
466
index.get_ancestry_with_ghosts, ["e"])
468
def test_iter_parents(self):
469
transport = MockTransport()
470
index = self.get_knit_index(transport, "filename", "w", create=True)
472
index.add_version('r0', ['option'], 0, 1, [])
474
index.add_version('r1', ['option'], 0, 1, ['r0'])
476
index.add_version('r2', ['option'], 0, 1, ['r1', 'r0'])
478
# cases: each sample data individually:
479
self.assertEqual(set([('r0', ())]),
480
set(index.iter_parents(['r0'])))
481
self.assertEqual(set([('r1', ('r0', ))]),
482
set(index.iter_parents(['r1'])))
483
self.assertEqual(set([('r2', ('r1', 'r0'))]),
484
set(index.iter_parents(['r2'])))
485
# no nodes returned for a missing node
486
self.assertEqual(set(),
487
set(index.iter_parents(['missing'])))
488
# 1 node returned with missing nodes skipped
489
self.assertEqual(set([('r1', ('r0', ))]),
490
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
492
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
493
set(index.iter_parents(['r0', 'r1'])))
494
# 2 nodes returned, missing skipped
495
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
496
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
498
def test_num_versions(self):
499
transport = MockTransport([
502
index = self.get_knit_index(transport, "filename", "r")
504
self.assertEqual(0, index.num_versions())
505
self.assertEqual(0, len(index))
507
index.add_version("a", ["option"], 0, 1, [])
508
self.assertEqual(1, index.num_versions())
509
self.assertEqual(1, len(index))
511
index.add_version("a", ["option2"], 1, 2, [])
512
self.assertEqual(1, index.num_versions())
513
self.assertEqual(1, len(index))
515
index.add_version("b", ["option"], 0, 1, [])
516
self.assertEqual(2, index.num_versions())
517
self.assertEqual(2, len(index))
519
def test_get_versions(self):
520
transport = MockTransport([
523
index = self.get_knit_index(transport, "filename", "r")
525
self.assertEqual([], index.get_versions())
527
index.add_version("a", ["option"], 0, 1, [])
528
self.assertEqual(["a"], index.get_versions())
530
index.add_version("a", ["option"], 0, 1, [])
531
self.assertEqual(["a"], index.get_versions())
533
index.add_version("b", ["option"], 0, 1, [])
534
self.assertEqual(["a", "b"], index.get_versions())
536
def test_add_version(self):
537
transport = MockTransport([
540
index = self.get_knit_index(transport, "filename", "r")
542
index.add_version("a", ["option"], 0, 1, ["b"])
543
self.assertEqual(("append_bytes",
544
("filename", "\na option 0 1 .b :"),
545
{}), transport.calls.pop(0))
546
self.assertTrue(index.has_version("a"))
547
self.assertEqual(1, index.num_versions())
548
self.assertEqual((0, 1), index.get_position("a"))
549
self.assertEqual(["option"], index.get_options("a"))
550
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
552
index.add_version("a", ["opt"], 1, 2, ["c"])
553
self.assertEqual(("append_bytes",
554
("filename", "\na opt 1 2 .c :"),
555
{}), transport.calls.pop(0))
556
self.assertTrue(index.has_version("a"))
557
self.assertEqual(1, index.num_versions())
558
self.assertEqual((1, 2), index.get_position("a"))
559
self.assertEqual(["opt"], index.get_options("a"))
560
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
562
index.add_version("b", ["option"], 2, 3, ["a"])
563
self.assertEqual(("append_bytes",
564
("filename", "\nb option 2 3 0 :"),
565
{}), transport.calls.pop(0))
566
self.assertTrue(index.has_version("b"))
567
self.assertEqual(2, index.num_versions())
568
self.assertEqual((2, 3), index.get_position("b"))
569
self.assertEqual(["option"], index.get_options("b"))
570
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
572
def test_add_versions(self):
573
transport = MockTransport([
576
index = self.get_knit_index(transport, "filename", "r")
579
("a", ["option"], 0, 1, ["b"]),
580
("a", ["opt"], 1, 2, ["c"]),
581
("b", ["option"], 2, 3, ["a"])
583
self.assertEqual(("append_bytes", ("filename",
584
"\na option 0 1 .b :"
587
), {}), transport.calls.pop(0))
588
self.assertTrue(index.has_version("a"))
589
self.assertTrue(index.has_version("b"))
590
self.assertEqual(2, index.num_versions())
591
self.assertEqual((1, 2), index.get_position("a"))
592
self.assertEqual((2, 3), index.get_position("b"))
593
self.assertEqual(["opt"], index.get_options("a"))
594
self.assertEqual(["option"], index.get_options("b"))
595
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
596
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
598
def test_delay_create_and_add_versions(self):
599
transport = MockTransport()
601
index = self.get_knit_index(transport, "filename", "w",
602
create=True, file_mode="wb", create_parent_dir=True,
603
delay_create=True, dir_mode=0777)
604
self.assertEqual([], transport.calls)
607
("a", ["option"], 0, 1, ["b"]),
608
("a", ["opt"], 1, 2, ["c"]),
609
("b", ["option"], 2, 3, ["a"])
611
name, (filename, f), kwargs = transport.calls.pop(0)
612
self.assertEqual("put_file_non_atomic", name)
614
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
616
self.assertEqual("filename", filename)
619
"\na option 0 1 .b :"
621
"\nb option 2 3 0 :",
624
def test_has_version(self):
625
transport = MockTransport([
629
index = self.get_knit_index(transport, "filename", "r")
631
self.assertTrue(index.has_version("a"))
632
self.assertFalse(index.has_version("b"))
634
def test_get_position(self):
635
transport = MockTransport([
640
index = self.get_knit_index(transport, "filename", "r")
642
self.assertEqual((0, 1), index.get_position("a"))
643
self.assertEqual((1, 2), index.get_position("b"))
645
def test_get_method(self):
646
transport = MockTransport([
648
"a fulltext,unknown 0 1 :",
649
"b unknown,line-delta 1 2 :",
652
index = self.get_knit_index(transport, "filename", "r")
654
self.assertEqual("fulltext", index.get_method("a"))
655
self.assertEqual("line-delta", index.get_method("b"))
656
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
658
def test_get_options(self):
659
transport = MockTransport([
664
index = self.get_knit_index(transport, "filename", "r")
666
self.assertEqual(["opt1"], index.get_options("a"))
667
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
669
def test_get_parents(self):
670
transport = MockTransport([
673
"b option 1 2 0 .c :",
674
"c option 1 2 1 0 .e :"
676
index = self.get_knit_index(transport, "filename", "r")
678
self.assertEqual([], index.get_parents("a"))
679
self.assertEqual(["a", "c"], index.get_parents("b"))
680
self.assertEqual(["b", "a"], index.get_parents("c"))
682
def test_get_parents_with_ghosts(self):
683
transport = MockTransport([
686
"b option 1 2 0 .c :",
687
"c option 1 2 1 0 .e :"
689
index = self.get_knit_index(transport, "filename", "r")
691
self.assertEqual([], index.get_parents_with_ghosts("a"))
692
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
693
self.assertEqual(["b", "a", "e"],
694
index.get_parents_with_ghosts("c"))
696
def test_check_versions_present(self):
697
transport = MockTransport([
702
index = self.get_knit_index(transport, "filename", "r")
704
check = index.check_versions_present
710
self.assertRaises(RevisionNotPresent, check, ["c"])
711
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
713
def test_impossible_parent(self):
714
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
715
transport = MockTransport([
718
"b option 0 1 4 :" # We don't have a 4th record
721
self.assertRaises(errors.KnitCorrupt,
722
self.get_knit_index, transport, 'filename', 'r')
724
if (str(e) == ('exceptions must be strings, classes, or instances,'
725
' not exceptions.IndexError')
726
and sys.version_info[0:2] >= (2,5)):
727
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
728
' raising new style exceptions with python'
733
def test_corrupted_parent(self):
734
transport = MockTransport([
738
"c option 0 1 1v :", # Can't have a parent of '1v'
741
self.assertRaises(errors.KnitCorrupt,
742
self.get_knit_index, transport, 'filename', 'r')
744
if (str(e) == ('exceptions must be strings, classes, or instances,'
745
' not exceptions.ValueError')
746
and sys.version_info[0:2] >= (2,5)):
747
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
748
' raising new style exceptions with python'
753
def test_corrupted_parent_in_list(self):
754
transport = MockTransport([
758
"c option 0 1 1 v :", # Can't have a parent of 'v'
761
self.assertRaises(errors.KnitCorrupt,
762
self.get_knit_index, transport, 'filename', 'r')
764
if (str(e) == ('exceptions must be strings, classes, or instances,'
765
' not exceptions.ValueError')
766
and sys.version_info[0:2] >= (2,5)):
767
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
768
' raising new style exceptions with python'
773
def test_invalid_position(self):
774
transport = MockTransport([
779
self.assertRaises(errors.KnitCorrupt,
780
self.get_knit_index, transport, 'filename', 'r')
782
if (str(e) == ('exceptions must be strings, classes, or instances,'
783
' not exceptions.ValueError')
784
and sys.version_info[0:2] >= (2,5)):
785
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
786
' raising new style exceptions with python'
791
def test_invalid_size(self):
792
transport = MockTransport([
797
self.assertRaises(errors.KnitCorrupt,
798
self.get_knit_index, transport, 'filename', 'r')
800
if (str(e) == ('exceptions must be strings, classes, or instances,'
801
' not exceptions.ValueError')
802
and sys.version_info[0:2] >= (2,5)):
803
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
804
' raising new style exceptions with python'
809
def test_short_line(self):
810
transport = MockTransport([
813
"b option 10 10 0", # This line isn't terminated, ignored
815
index = self.get_knit_index(transport, "filename", "r")
816
self.assertEqual(['a'], index.get_versions())
818
def test_skip_incomplete_record(self):
819
# A line with bogus data should just be skipped
820
transport = MockTransport([
823
"b option 10 10 0", # This line isn't terminated, ignored
824
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
826
index = self.get_knit_index(transport, "filename", "r")
827
self.assertEqual(['a', 'c'], index.get_versions())
829
def test_trailing_characters(self):
830
# A line with bogus data should just be skipped
831
transport = MockTransport([
834
"b option 10 10 0 :a", # This line has extra trailing characters
835
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
837
index = self.get_knit_index(transport, "filename", "r")
838
self.assertEqual(['a', 'c'], index.get_versions())
841
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
843
_test_needs_features = [CompiledKnitFeature]
845
def get_knit_index(self, *args, **kwargs):
846
orig = knit._load_data
848
knit._load_data = orig
849
self.addCleanup(reset)
850
from bzrlib._knit_load_data_c import _load_data_c
851
knit._load_data = _load_data_c
852
return _KnitIndex(*args, **kwargs)
856
class KnitTests(TestCaseWithTransport):
857
"""Class containing knit test helper routines."""
859
def make_test_knit(self, annotate=False, delay_create=False, index=None):
861
factory = KnitPlainFactory()
864
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
865
factory=factory, create=True,
866
delay_create=delay_create, index=index)
869
class BasicKnitTests(KnitTests):
871
def add_stock_one_and_one_a(self, k):
872
k.add_lines('text-1', [], split_lines(TEXT_1))
873
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
875
def test_knit_constructor(self):
876
"""Construct empty k"""
877
self.make_test_knit()
879
def test_make_explicit_index(self):
880
"""We can supply an index to use."""
881
knit = KnitVersionedFile('test', get_transport('.'),
883
self.assertEqual(knit._index, 'strangelove')
885
def test_knit_add(self):
886
"""Store one text in knit and retrieve"""
887
k = self.make_test_knit()
888
k.add_lines('text-1', [], split_lines(TEXT_1))
889
self.assertTrue(k.has_version('text-1'))
890
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
892
def test_knit_reload(self):
893
# test that the content in a reloaded knit is correct
894
k = self.make_test_knit()
895
k.add_lines('text-1', [], split_lines(TEXT_1))
897
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
898
self.assertTrue(k2.has_version('text-1'))
899
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
901
def test_knit_several(self):
902
"""Store several texts in a knit"""
903
k = self.make_test_knit()
904
k.add_lines('text-1', [], split_lines(TEXT_1))
905
k.add_lines('text-2', [], split_lines(TEXT_2))
906
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
907
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
909
def test_repeated_add(self):
910
"""Knit traps attempt to replace existing version"""
911
k = self.make_test_knit()
912
k.add_lines('text-1', [], split_lines(TEXT_1))
913
self.assertRaises(RevisionAlreadyPresent,
915
'text-1', [], split_lines(TEXT_1))
917
def test_empty(self):
918
k = self.make_test_knit(True)
919
k.add_lines('text-1', [], [])
920
self.assertEquals(k.get_lines('text-1'), [])
922
def test_incomplete(self):
923
"""Test if texts without a ending line-end can be inserted and
925
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
926
k.add_lines('text-1', [], ['a\n', 'b' ])
927
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
928
# reopening ensures maximum room for confusion
929
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
930
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
931
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
933
def test_delta(self):
934
"""Expression of knit delta as lines"""
935
k = self.make_test_knit()
937
td = list(line_delta(TEXT_1.splitlines(True),
938
TEXT_1A.splitlines(True)))
939
self.assertEqualDiff(''.join(td), delta_1_1a)
940
out = apply_line_delta(TEXT_1.splitlines(True), td)
941
self.assertEqualDiff(''.join(out), TEXT_1A)
943
def assertDerivedBlocksEqual(self, source, target, noeol=False):
944
"""Assert that the derived matching blocks match real output"""
945
source_lines = source.splitlines(True)
946
target_lines = target.splitlines(True)
948
if noeol and not line.endswith('\n'):
952
source_content = KnitContent([(None, nl(l)) for l in source_lines])
953
target_content = KnitContent([(None, nl(l)) for l in target_lines])
954
line_delta = source_content.line_delta(target_content)
955
delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
956
source_lines, target_lines))
957
matcher = KnitSequenceMatcher(None, source_lines, target_lines)
958
matcher_blocks = list(list(matcher.get_matching_blocks()))
959
self.assertEqual(matcher_blocks, delta_blocks)
961
def test_get_line_delta_blocks(self):
962
self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
963
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
964
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
965
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
966
self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
967
self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
968
self.assertDerivedBlocksEqual(TEXT_1A, '')
969
self.assertDerivedBlocksEqual('', TEXT_1A)
970
self.assertDerivedBlocksEqual('', '')
971
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
973
def test_get_line_delta_blocks_noeol(self):
974
"""Handle historical knit deltas safely
976
Some existing knit deltas don't consider the last line to differ
977
when the only difference whether it has a final newline.
979
New knit deltas appear to always consider the last line to differ
982
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
983
self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
984
self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
985
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
987
def test_add_with_parents(self):
988
"""Store in knit with parents"""
989
k = self.make_test_knit()
990
self.add_stock_one_and_one_a(k)
991
self.assertEquals(k.get_parents('text-1'), [])
992
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
994
def test_ancestry(self):
995
"""Store in knit with parents"""
996
k = self.make_test_knit()
997
self.add_stock_one_and_one_a(k)
998
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
1000
def test_add_delta(self):
1001
"""Store in knit with parents"""
1002
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
1003
delta=True, create=True)
1004
self.add_stock_one_and_one_a(k)
1006
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1008
def test_add_delta_knit_graph_index(self):
1009
"""Does adding work with a KnitGraphIndex."""
1010
index = InMemoryGraphIndex(2)
1011
knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
1013
k = KnitVersionedFile('test', get_transport('.'),
1014
delta=True, create=True, index=knit_index)
1015
self.add_stock_one_and_one_a(k)
1017
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1018
# check the index had the right data added.
1019
self.assertEqual(set([
1020
(('text-1', ), ' 0 127', ((), ())),
1021
(('text-1a', ), ' 127 140', ((('text-1', ),), (('text-1', ),))),
1022
]), set(index.iter_all_entries()))
1023
# we should not have a .kndx file
1024
self.assertFalse(get_transport('.').has('test.kndx'))
1026
def test_annotate(self):
1028
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1029
delta=True, create=True)
1030
self.insert_and_test_small_annotate(k)
1032
def insert_and_test_small_annotate(self, k):
1033
"""test annotation with k works correctly."""
1034
k.add_lines('text-1', [], ['a\n', 'b\n'])
1035
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
1037
origins = k.annotate('text-2')
1038
self.assertEquals(origins[0], ('text-1', 'a\n'))
1039
self.assertEquals(origins[1], ('text-2', 'c\n'))
1041
def test_annotate_fulltext(self):
1043
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1044
delta=False, create=True)
1045
self.insert_and_test_small_annotate(k)
1047
def test_annotate_merge_1(self):
1048
k = self.make_test_knit(True)
1049
k.add_lines('text-a1', [], ['a\n', 'b\n'])
1050
k.add_lines('text-a2', [], ['d\n', 'c\n'])
1051
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
1052
origins = k.annotate('text-am')
1053
self.assertEquals(origins[0], ('text-a2', 'd\n'))
1054
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1056
def test_annotate_merge_2(self):
1057
k = self.make_test_knit(True)
1058
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1059
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1060
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
1061
origins = k.annotate('text-am')
1062
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1063
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1064
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1066
def test_annotate_merge_9(self):
1067
k = self.make_test_knit(True)
1068
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1069
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1070
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
1071
origins = k.annotate('text-am')
1072
self.assertEquals(origins[0], ('text-am', 'k\n'))
1073
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1074
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1076
def test_annotate_merge_3(self):
1077
k = self.make_test_knit(True)
1078
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1079
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
1080
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
1081
origins = k.annotate('text-am')
1082
self.assertEquals(origins[0], ('text-am', 'k\n'))
1083
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1084
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1086
def test_annotate_merge_4(self):
1087
k = self.make_test_knit(True)
1088
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1089
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1090
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
1091
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
1092
origins = k.annotate('text-am')
1093
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1094
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1095
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1097
def test_annotate_merge_5(self):
1098
k = self.make_test_knit(True)
1099
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1100
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
1101
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
1102
k.add_lines('text-am',
1103
['text-a1', 'text-a2', 'text-a3'],
1104
['a\n', 'e\n', 'z\n'])
1105
origins = k.annotate('text-am')
1106
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1107
self.assertEquals(origins[1], ('text-a2', 'e\n'))
1108
self.assertEquals(origins[2], ('text-a3', 'z\n'))
1110
def test_annotate_file_cherry_pick(self):
1111
k = self.make_test_knit(True)
1112
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
1113
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
1114
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
1115
origins = k.annotate('text-3')
1116
self.assertEquals(origins[0], ('text-1', 'a\n'))
1117
self.assertEquals(origins[1], ('text-1', 'b\n'))
1118
self.assertEquals(origins[2], ('text-1', 'c\n'))
1120
def test_knit_join(self):
1121
"""Store in knit with parents"""
1122
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
1123
k1.add_lines('text-a', [], split_lines(TEXT_1))
1124
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1126
k1.add_lines('text-c', [], split_lines(TEXT_1))
1127
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
1129
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
1131
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
1132
count = k2.join(k1, version_ids=['text-m'])
1133
self.assertEquals(count, 5)
1134
self.assertTrue(k2.has_version('text-a'))
1135
self.assertTrue(k2.has_version('text-c'))
1137
def test_reannotate(self):
1138
k1 = KnitVersionedFile('knit1', get_transport('.'),
1139
factory=KnitAnnotateFactory(), create=True)
1141
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1143
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1145
k2 = KnitVersionedFile('test2', get_transport('.'),
1146
factory=KnitAnnotateFactory(), create=True)
1147
k2.join(k1, version_ids=['text-b'])
1150
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
1152
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
1154
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
1156
# test-c will have index 3
1157
k1.join(k2, version_ids=['text-c'])
1159
lines = k1.get_lines('text-c')
1160
self.assertEquals(lines, ['z\n', 'c\n'])
1162
origins = k1.annotate('text-c')
1163
self.assertEquals(origins[0], ('text-c', 'z\n'))
1164
self.assertEquals(origins[1], ('text-b', 'c\n'))
1166
def test_get_line_delta_texts(self):
1167
"""Make sure we can call get_texts on text with reused line deltas"""
1168
k1 = KnitVersionedFile('test1', get_transport('.'),
1169
factory=KnitPlainFactory(), create=True)
1174
parents = ['%d' % (t-1)]
1175
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
1176
k1.get_texts(('%d' % t) for t in range(3))
1178
def test_iter_lines_reads_in_order(self):
1179
t = MemoryTransport()
1180
instrumented_t = TransportLogger(t)
1181
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1182
self.assertEqual([('id.kndx',)], instrumented_t._calls)
1183
# add texts with no required ordering
1184
k1.add_lines('base', [], ['text\n'])
1185
k1.add_lines('base2', [], ['text2\n'])
1187
instrumented_t._calls = []
1188
# request a last-first iteration
1189
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
1190
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
1191
self.assertEqual(['text\n', 'text2\n'], results)
1193
def test_create_empty_annotated(self):
1194
k1 = self.make_test_knit(True)
1196
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1197
k2 = k1.create_empty('t', MemoryTransport())
1198
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
1199
self.assertEqual(k1.delta, k2.delta)
1200
# the generic test checks for empty content and file class
1202
def test_knit_format(self):
1203
# this tests that a new knit index file has the expected content
1204
# and that is writes the data we expect as records are added.
1205
knit = self.make_test_knit(True)
1206
# Now knit files are not created until we first add data to them
1207
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
1208
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1209
self.assertFileEqual(
1210
"# bzr knit index 8\n"
1212
"revid fulltext 0 84 .a_ghost :",
1214
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
1215
self.assertFileEqual(
1216
"# bzr knit index 8\n"
1217
"\nrevid fulltext 0 84 .a_ghost :"
1218
"\nrevid2 line-delta 84 82 0 :",
1220
# we should be able to load this file again
1221
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1222
self.assertEqual(['revid', 'revid2'], knit.versions())
1223
# write a short write to the file and ensure that its ignored
1224
indexfile = file('test.kndx', 'ab')
1225
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1227
# we should be able to load this file again
1228
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1229
self.assertEqual(['revid', 'revid2'], knit.versions())
1230
# and add a revision with the same id the failed write had
1231
knit.add_lines('revid3', ['revid2'], ['a\n'])
1232
# and when reading it revid3 should now appear.
1233
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1234
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1235
self.assertEqual(['revid2'], knit.get_parents('revid3'))
1237
def test_delay_create(self):
1238
"""Test that passing delay_create=True creates files late"""
1239
knit = self.make_test_knit(annotate=True, delay_create=True)
1240
self.failIfExists('test.knit')
1241
self.failIfExists('test.kndx')
1242
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1243
self.failUnlessExists('test.knit')
1244
self.assertFileEqual(
1245
"# bzr knit index 8\n"
1247
"revid fulltext 0 84 .a_ghost :",
1250
def test_create_parent_dir(self):
1251
"""create_parent_dir can create knits in nonexistant dirs"""
1252
# Has no effect if we don't set 'delay_create'
1253
trans = get_transport('.')
1254
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1255
trans, access_mode='w', factory=None,
1256
create=True, create_parent_dir=True)
1257
# Nothing should have changed yet
1258
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1259
factory=None, create=True,
1260
create_parent_dir=True,
1262
self.failIfExists('dir/test.knit')
1263
self.failIfExists('dir/test.kndx')
1264
self.failIfExists('dir')
1265
knit.add_lines('revid', [], ['a\n'])
1266
self.failUnlessExists('dir')
1267
self.failUnlessExists('dir/test.knit')
1268
self.assertFileEqual(
1269
"# bzr knit index 8\n"
1271
"revid fulltext 0 84 :",
1274
def test_create_mode_700(self):
1275
trans = get_transport('.')
1276
if not trans._can_roundtrip_unix_modebits():
1277
# Can't roundtrip, so no need to run this test
1279
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1280
factory=None, create=True,
1281
create_parent_dir=True,
1285
knit.add_lines('revid', [], ['a\n'])
1286
self.assertTransportMode(trans, 'dir', 0700)
1287
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1288
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
1290
def test_create_mode_770(self):
1291
trans = get_transport('.')
1292
if not trans._can_roundtrip_unix_modebits():
1293
# Can't roundtrip, so no need to run this test
1295
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1296
factory=None, create=True,
1297
create_parent_dir=True,
1301
knit.add_lines('revid', [], ['a\n'])
1302
self.assertTransportMode(trans, 'dir', 0770)
1303
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1304
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
1306
def test_create_mode_777(self):
1307
trans = get_transport('.')
1308
if not trans._can_roundtrip_unix_modebits():
1309
# Can't roundtrip, so no need to run this test
1311
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1312
factory=None, create=True,
1313
create_parent_dir=True,
1317
knit.add_lines('revid', [], ['a\n'])
1318
self.assertTransportMode(trans, 'dir', 0777)
1319
self.assertTransportMode(trans, 'dir/test.knit', 0666)
1320
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
1322
def test_plan_merge(self):
1323
my_knit = self.make_test_knit(annotate=True)
1324
my_knit.add_lines('text1', [], split_lines(TEXT_1))
1325
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
1326
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
1327
plan = list(my_knit.plan_merge('text1a', 'text1b'))
1328
for plan_line, expected_line in zip(plan, AB_MERGE):
1329
self.assertEqual(plan_line, expected_line)
1341
Banana cup cake recipe
1347
- self-raising flour
1351
Banana cup cake recipe
1353
- bananas (do not use plantains!!!)
1360
Banana cup cake recipe
1363
- self-raising flour
1376
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1381
new-b|- bananas (do not use plantains!!!)
1382
unchanged|- broken tea cups
1383
new-a|- self-raising flour
1386
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1389
def line_delta(from_lines, to_lines):
1390
"""Generate line-based delta from one text to another"""
1391
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1392
for op in s.get_opcodes():
1393
if op[0] == 'equal':
1395
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1396
for i in range(op[3], op[4]):
1400
def apply_line_delta(basis_lines, delta_lines):
1401
"""Apply a line-based perfect diff
1403
basis_lines -- text to apply the patch to
1404
delta_lines -- diff instructions and content
1406
out = basis_lines[:]
1409
while i < len(delta_lines):
1411
a, b, c = map(long, l.split(','))
1413
out[offset+a:offset+b] = delta_lines[i:i+c]
1415
offset = offset + (b - a) + c
1419
class TestWeaveToKnit(KnitTests):
1421
def test_weave_to_knit_matches(self):
1422
# check that the WeaveToKnit is_compatible function
1423
# registers True for a Weave to a Knit.
1425
k = self.make_test_knit()
1426
self.failUnless(WeaveToKnit.is_compatible(w, k))
1427
self.failIf(WeaveToKnit.is_compatible(k, w))
1428
self.failIf(WeaveToKnit.is_compatible(w, w))
1429
self.failIf(WeaveToKnit.is_compatible(k, k))
1432
class TestKnitCaching(KnitTests):
1434
def create_knit(self, cache_add=False):
1435
k = self.make_test_knit(True)
1439
k.add_lines('text-1', [], split_lines(TEXT_1))
1440
k.add_lines('text-2', [], split_lines(TEXT_2))
1443
def test_no_caching(self):
1444
k = self.create_knit()
1445
# Nothing should be cached without setting 'enable_cache'
1446
self.assertEqual({}, k._data._cache)
1448
def test_cache_add_and_clear(self):
1449
k = self.create_knit(True)
1451
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1454
self.assertEqual({}, k._data._cache)
1456
def test_cache_data_read_raw(self):
1457
k = self.create_knit()
1459
# Now cache and read
1462
def read_one_raw(version):
1463
pos_map = k._get_components_positions([version])
1464
method, pos, size, next = pos_map[version]
1465
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1466
self.assertEqual(1, len(lst))
1469
val = read_one_raw('text-1')
1470
self.assertEqual({'text-1':val[1]}, k._data._cache)
1473
# After clear, new reads are not cached
1474
self.assertEqual({}, k._data._cache)
1476
val2 = read_one_raw('text-1')
1477
self.assertEqual(val, val2)
1478
self.assertEqual({}, k._data._cache)
1480
def test_cache_data_read(self):
1481
k = self.create_knit()
1483
def read_one(version):
1484
pos_map = k._get_components_positions([version])
1485
method, pos, size, next = pos_map[version]
1486
lst = list(k._data.read_records_iter([(version, pos, size)]))
1487
self.assertEqual(1, len(lst))
1490
# Now cache and read
1493
val = read_one('text-2')
1494
self.assertEqual(['text-2'], k._data._cache.keys())
1495
self.assertEqual('text-2', val[0])
1496
content, digest = k._data._parse_record('text-2',
1497
k._data._cache['text-2'])
1498
self.assertEqual(content, val[1])
1499
self.assertEqual(digest, val[2])
1502
self.assertEqual({}, k._data._cache)
1504
val2 = read_one('text-2')
1505
self.assertEqual(val, val2)
1506
self.assertEqual({}, k._data._cache)
1508
def test_cache_read(self):
1509
k = self.create_knit()
1512
text = k.get_text('text-1')
1513
self.assertEqual(TEXT_1, text)
1514
self.assertEqual(['text-1'], k._data._cache.keys())
1517
self.assertEqual({}, k._data._cache)
1519
text = k.get_text('text-1')
1520
self.assertEqual(TEXT_1, text)
1521
self.assertEqual({}, k._data._cache)
1524
class TestKnitIndex(KnitTests):
1526
def test_add_versions_dictionary_compresses(self):
1527
"""Adding versions to the index should update the lookup dict"""
1528
knit = self.make_test_knit()
1530
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1531
self.check_file_contents('test.kndx',
1532
'# bzr knit index 8\n'
1534
'a-1 fulltext 0 0 :'
1536
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1537
('a-3', ['fulltext'], 0, 0, ['a-2']),
1539
self.check_file_contents('test.kndx',
1540
'# bzr knit index 8\n'
1542
'a-1 fulltext 0 0 :\n'
1543
'a-2 fulltext 0 0 0 :\n'
1544
'a-3 fulltext 0 0 1 :'
1546
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1547
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1548
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1549
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1552
def test_add_versions_fails_clean(self):
1553
"""If add_versions fails in the middle, it restores a pristine state.
1555
Any modifications that are made to the index are reset if all versions
1558
# This cheats a little bit by passing in a generator which will
1559
# raise an exception before the processing finishes
1560
# Other possibilities would be to have an version with the wrong number
1561
# of entries, or to make the backing transport unable to write any
1564
knit = self.make_test_knit()
1566
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1568
class StopEarly(Exception):
1571
def generate_failure():
1572
"""Add some entries and then raise an exception"""
1573
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1574
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1577
# Assert the pre-condition
1578
self.assertEqual(['a-1'], idx._history)
1579
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1581
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1583
# And it shouldn't be modified
1584
self.assertEqual(['a-1'], idx._history)
1585
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1587
def test_knit_index_ignores_empty_files(self):
1588
# There was a race condition in older bzr, where a ^C at the right time
1589
# could leave an empty .kndx file, which bzr would later claim was a
1590
# corrupted file since the header was not present. In reality, the file
1591
# just wasn't created, so it should be ignored.
1592
t = get_transport('.')
1593
t.put_bytes('test.kndx', '')
1595
knit = self.make_test_knit()
1597
def test_knit_index_checks_header(self):
1598
t = get_transport('.')
1599
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1601
self.assertRaises(KnitHeaderError, self.make_test_knit)
1604
class TestGraphIndexKnit(KnitTests):
1605
"""Tests for knits using a GraphIndex rather than a KnitIndex."""
1607
def make_g_index(self, name, ref_lists=0, nodes=[]):
1608
builder = GraphIndexBuilder(ref_lists)
1609
for node, references, value in nodes:
1610
builder.add_node(node, references, value)
1611
stream = builder.finish()
1612
trans = self.get_transport()
1613
trans.put_file(name, stream)
1614
return GraphIndex(trans, name)
1616
def two_graph_index(self, deltas=False, catch_adds=False):
1617
"""Build a two-graph index.
1619
:param deltas: If true, use underlying indices with two node-ref
1620
lists and 'parent' set to a delta-compressed against tail.
1622
# build a complex graph across several indices.
1624
# delta compression inn the index
1625
index1 = self.make_g_index('1', 2, [
1626
(('tip', ), 'N0 100', ([('parent', )], [], )),
1627
(('tail', ), '', ([], []))])
1628
index2 = self.make_g_index('2', 2, [
1629
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], [('tail', )])),
1630
(('separate', ), '', ([], []))])
1632
# just blob location and graph in the index.
1633
index1 = self.make_g_index('1', 1, [
1634
(('tip', ), 'N0 100', ([('parent', )], )),
1635
(('tail', ), '', ([], ))])
1636
index2 = self.make_g_index('2', 1, [
1637
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], )),
1638
(('separate', ), '', ([], ))])
1639
combined_index = CombinedGraphIndex([index1, index2])
1641
self.combined_index = combined_index
1642
self.caught_entries = []
1643
add_callback = self.catch_add
1646
return KnitGraphIndex(combined_index, deltas=deltas,
1647
add_callback=add_callback)
1649
def test_get_graph(self):
1650
index = self.two_graph_index()
1651
self.assertEqual(set([
1652
('tip', ('parent', )),
1654
('parent', ('tail', 'ghost')),
1656
]), set(index.get_graph()))
1658
def test_get_ancestry(self):
1659
# get_ancestry is defined as eliding ghosts, not erroring.
1660
index = self.two_graph_index()
1661
self.assertEqual([], index.get_ancestry([]))
1662
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1663
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1664
self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
1665
self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
1666
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1667
(['tail', 'parent', 'tip', 'separate'],
1668
['separate', 'tail', 'parent', 'tip'],
1670
# and without topo_sort
1671
self.assertEqual(set(['separate']),
1672
set(index.get_ancestry(['separate'], topo_sorted=False)))
1673
self.assertEqual(set(['tail']),
1674
set(index.get_ancestry(['tail'], topo_sorted=False)))
1675
self.assertEqual(set(['tail', 'parent']),
1676
set(index.get_ancestry(['parent'], topo_sorted=False)))
1677
self.assertEqual(set(['tail', 'parent', 'tip']),
1678
set(index.get_ancestry(['tip'], topo_sorted=False)))
1679
self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
1680
set(index.get_ancestry(['tip', 'separate'])))
1681
# asking for a ghost makes it go boom.
1682
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1684
def test_get_ancestry_with_ghosts(self):
1685
index = self.two_graph_index()
1686
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1687
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1688
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1689
self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
1690
(['tail', 'ghost', 'parent'],
1691
['ghost', 'tail', 'parent'],
1693
self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
1694
(['tail', 'ghost', 'parent', 'tip'],
1695
['ghost', 'tail', 'parent', 'tip'],
1697
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1698
(['tail', 'ghost', 'parent', 'tip', 'separate'],
1699
['ghost', 'tail', 'parent', 'tip', 'separate'],
1700
['separate', 'tail', 'ghost', 'parent', 'tip'],
1701
['separate', 'ghost', 'tail', 'parent', 'tip'],
1703
# asking for a ghost makes it go boom.
1704
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1706
def test_num_versions(self):
1707
index = self.two_graph_index()
1708
self.assertEqual(4, index.num_versions())
1710
def test_get_versions(self):
1711
index = self.two_graph_index()
1712
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1713
set(index.get_versions()))
1715
def test_has_version(self):
1716
index = self.two_graph_index()
1717
self.assertTrue(index.has_version('tail'))
1718
self.assertFalse(index.has_version('ghost'))
1720
def test_get_position(self):
1721
index = self.two_graph_index()
1722
self.assertEqual((0, 100), index.get_position('tip'))
1723
self.assertEqual((100, 78), index.get_position('parent'))
1725
def test_get_method_deltas(self):
1726
index = self.two_graph_index(deltas=True)
1727
self.assertEqual('fulltext', index.get_method('tip'))
1728
self.assertEqual('line-delta', index.get_method('parent'))
1730
def test_get_method_no_deltas(self):
1731
# check that the parent-history lookup is ignored with deltas=False.
1732
index = self.two_graph_index(deltas=False)
1733
self.assertEqual('fulltext', index.get_method('tip'))
1734
self.assertEqual('fulltext', index.get_method('parent'))
1736
def test_get_options_deltas(self):
1737
index = self.two_graph_index(deltas=True)
1738
self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
1739
self.assertEqual(['line-delta'], index.get_options('parent'))
1741
def test_get_options_no_deltas(self):
1742
# check that the parent-history lookup is ignored with deltas=False.
1743
index = self.two_graph_index(deltas=False)
1744
self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
1745
self.assertEqual(['fulltext'], index.get_options('parent'))
1747
def test_get_parents(self):
1748
# get_parents ignores ghosts
1749
index = self.two_graph_index()
1750
self.assertEqual(('tail', ), index.get_parents('parent'))
1751
# and errors on ghosts.
1752
self.assertRaises(errors.RevisionNotPresent,
1753
index.get_parents, 'ghost')
1755
def test_get_parents_with_ghosts(self):
1756
index = self.two_graph_index()
1757
self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
1758
# and errors on ghosts.
1759
self.assertRaises(errors.RevisionNotPresent,
1760
index.get_parents_with_ghosts, 'ghost')
1762
def test_check_versions_present(self):
1763
# ghosts should not be considered present
1764
index = self.two_graph_index()
1765
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1767
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1769
index.check_versions_present(['tail', 'separate'])
1771
def catch_add(self, entries):
1772
self.caught_entries.append(entries)
1774
def test_add_no_callback_errors(self):
1775
index = self.two_graph_index()
1776
self.assertRaises(errors.ReadOnlyError, index.add_version,
1777
'new', 'fulltext,no-eol', 50, 60, ['separate'])
1779
def test_add_version_smoke(self):
1780
index = self.two_graph_index(catch_adds=True)
1781
index.add_version('new', 'fulltext,no-eol', 50, 60, ['separate'])
1782
self.assertEqual([[(('new', ), 'N50 60', ((('separate',),),))]],
1783
self.caught_entries)
1785
def test_add_version_delta_not_delta_index(self):
1786
index = self.two_graph_index(catch_adds=True)
1787
self.assertRaises(errors.KnitCorrupt, index.add_version,
1788
'new', 'no-eol,line-delta', 0, 100, ['parent'])
1789
self.assertEqual([], self.caught_entries)
1791
def test_add_version_same_dup(self):
1792
index = self.two_graph_index(catch_adds=True)
1793
# options can be spelt two different ways
1794
index.add_version('tip', 'fulltext,no-eol', 0, 100, ['parent'])
1795
index.add_version('tip', 'no-eol,fulltext', 0, 100, ['parent'])
1796
# but neither should have added data.
1797
self.assertEqual([[], []], self.caught_entries)
1799
def test_add_version_different_dup(self):
1800
index = self.two_graph_index(deltas=True, catch_adds=True)
1802
self.assertRaises(errors.KnitCorrupt, index.add_version,
1803
'tip', 'no-eol,line-delta', 0, 100, ['parent'])
1804
self.assertRaises(errors.KnitCorrupt, index.add_version,
1805
'tip', 'line-delta,no-eol', 0, 100, ['parent'])
1806
self.assertRaises(errors.KnitCorrupt, index.add_version,
1807
'tip', 'fulltext', 0, 100, ['parent'])
1809
self.assertRaises(errors.KnitCorrupt, index.add_version,
1810
'tip', 'fulltext,no-eol', 50, 100, ['parent'])
1811
self.assertRaises(errors.KnitCorrupt, index.add_version,
1812
'tip', 'fulltext,no-eol', 0, 1000, ['parent'])
1814
self.assertRaises(errors.KnitCorrupt, index.add_version,
1815
'tip', 'fulltext,no-eol', 0, 100, [])
1816
self.assertEqual([], self.caught_entries)
1818
def test_add_versions_nodeltas(self):
1819
index = self.two_graph_index(catch_adds=True)
1820
index.add_versions([
1821
('new', 'fulltext,no-eol', 50, 60, ['separate']),
1822
('new2', 'fulltext', 0, 6, ['new']),
1824
self.assertEqual([(('new', ), 'N50 60', ((('separate',),),)),
1825
(('new2', ), ' 0 6', ((('new',),),))],
1826
sorted(self.caught_entries[0]))
1827
self.assertEqual(1, len(self.caught_entries))
1829
def test_add_versions_deltas(self):
1830
index = self.two_graph_index(deltas=True, catch_adds=True)
1831
index.add_versions([
1832
('new', 'fulltext,no-eol', 50, 60, ['separate']),
1833
('new2', 'line-delta', 0, 6, ['new']),
1835
self.assertEqual([(('new', ), 'N50 60', ((('separate',),), ())),
1836
(('new2', ), ' 0 6', ((('new',),), (('new',),), ))],
1837
sorted(self.caught_entries[0]))
1838
self.assertEqual(1, len(self.caught_entries))
1840
def test_add_versions_delta_not_delta_index(self):
1841
index = self.two_graph_index(catch_adds=True)
1842
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1843
[('new', 'no-eol,line-delta', 0, 100, ['parent'])])
1844
self.assertEqual([], self.caught_entries)
1846
def test_add_versions_same_dup(self):
1847
index = self.two_graph_index(catch_adds=True)
1848
# options can be spelt two different ways
1849
index.add_versions([('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
1850
index.add_versions([('tip', 'no-eol,fulltext', 0, 100, ['parent'])])
1851
# but neither should have added data.
1852
self.assertEqual([[], []], self.caught_entries)
1854
def test_add_versions_different_dup(self):
1855
index = self.two_graph_index(deltas=True, catch_adds=True)
1857
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1858
[('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
1859
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1860
[('tip', 'line-delta,no-eol', 0, 100, ['parent'])])
1861
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1862
[('tip', 'fulltext', 0, 100, ['parent'])])
1864
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1865
[('tip', 'fulltext,no-eol', 50, 100, ['parent'])])
1866
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1867
[('tip', 'fulltext,no-eol', 0, 1000, ['parent'])])
1869
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1870
[('tip', 'fulltext,no-eol', 0, 100, [])])
1871
# change options in the second record
1872
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1873
[('tip', 'fulltext,no-eol', 0, 100, ['parent']),
1874
('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
1875
self.assertEqual([], self.caught_entries)
1877
def test_iter_parents(self):
1878
index1 = self.make_g_index('1', 1, [
1880
(('r0', ), 'N0 100', ([], )),
1882
(('r1', ), '', ([('r0', )], ))])
1883
index2 = self.make_g_index('2', 1, [
1885
(('r2', ), 'N0 100', ([('r1', ), ('r0', )], )),
1887
combined_index = CombinedGraphIndex([index1, index2])
1888
index = KnitGraphIndex(combined_index)
1890
# cases: each sample data individually:
1891
self.assertEqual(set([('r0', ())]),
1892
set(index.iter_parents(['r0'])))
1893
self.assertEqual(set([('r1', ('r0', ))]),
1894
set(index.iter_parents(['r1'])))
1895
self.assertEqual(set([('r2', ('r1', 'r0'))]),
1896
set(index.iter_parents(['r2'])))
1897
# no nodes returned for a missing node
1898
self.assertEqual(set(),
1899
set(index.iter_parents(['missing'])))
1900
# 1 node returned with missing nodes skipped
1901
self.assertEqual(set([('r1', ('r0', ))]),
1902
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
1904
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
1905
set(index.iter_parents(['r0', 'r1'])))
1906
# 2 nodes returned, missing skipped
1907
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
1908
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
1911
class TestNoParentsGraphIndexKnit(KnitTests):
1912
"""Tests for knits using KnitGraphIndex with no parents."""
1914
def make_g_index(self, name, ref_lists=0, nodes=[]):
1915
builder = GraphIndexBuilder(ref_lists)
1916
for node, references in nodes:
1917
builder.add_node(node, references)
1918
stream = builder.finish()
1919
trans = self.get_transport()
1920
trans.put_file(name, stream)
1921
return GraphIndex(trans, name)
1923
def test_parents_deltas_incompatible(self):
1924
index = CombinedGraphIndex([])
1925
self.assertRaises(errors.KnitError, KnitGraphIndex, index,
1926
deltas=True, parents=False)
1928
def two_graph_index(self, catch_adds=False):
1929
"""Build a two-graph index.
1931
:param deltas: If true, use underlying indices with two node-ref
1932
lists and 'parent' set to a delta-compressed against tail.
1934
# put several versions in the index.
1935
index1 = self.make_g_index('1', 0, [
1936
(('tip', ), 'N0 100'),
1938
index2 = self.make_g_index('2', 0, [
1939
(('parent', ), ' 100 78'),
1940
(('separate', ), '')])
1941
combined_index = CombinedGraphIndex([index1, index2])
1943
self.combined_index = combined_index
1944
self.caught_entries = []
1945
add_callback = self.catch_add
1948
return KnitGraphIndex(combined_index, parents=False,
1949
add_callback=add_callback)
1951
def test_get_graph(self):
1952
index = self.two_graph_index()
1953
self.assertEqual(set([
1958
]), set(index.get_graph()))
1960
def test_get_ancestry(self):
1961
# with no parents, ancestry is always just the key.
1962
index = self.two_graph_index()
1963
self.assertEqual([], index.get_ancestry([]))
1964
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1965
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1966
self.assertEqual(['parent'], index.get_ancestry(['parent']))
1967
self.assertEqual(['tip'], index.get_ancestry(['tip']))
1968
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1969
(['tip', 'separate'],
1970
['separate', 'tip'],
1972
# asking for a ghost makes it go boom.
1973
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1975
def test_get_ancestry_with_ghosts(self):
1976
index = self.two_graph_index()
1977
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1978
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1979
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1980
self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
1981
self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
1982
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1983
(['tip', 'separate'],
1984
['separate', 'tip'],
1986
# asking for a ghost makes it go boom.
1987
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1989
def test_num_versions(self):
1990
index = self.two_graph_index()
1991
self.assertEqual(4, index.num_versions())
1993
def test_get_versions(self):
1994
index = self.two_graph_index()
1995
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1996
set(index.get_versions()))
1998
def test_has_version(self):
1999
index = self.two_graph_index()
2000
self.assertTrue(index.has_version('tail'))
2001
self.assertFalse(index.has_version('ghost'))
2003
def test_get_position(self):
2004
index = self.two_graph_index()
2005
self.assertEqual((0, 100), index.get_position('tip'))
2006
self.assertEqual((100, 78), index.get_position('parent'))
2008
def test_get_method(self):
2009
index = self.two_graph_index()
2010
self.assertEqual('fulltext', index.get_method('tip'))
2011
self.assertEqual(['fulltext'], index.get_options('parent'))
2013
def test_get_options(self):
2014
index = self.two_graph_index()
2015
self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
2016
self.assertEqual(['fulltext'], index.get_options('parent'))
2018
def test_get_parents(self):
2019
index = self.two_graph_index()
2020
self.assertEqual((), index.get_parents('parent'))
2021
# and errors on ghosts.
2022
self.assertRaises(errors.RevisionNotPresent,
2023
index.get_parents, 'ghost')
2025
def test_get_parents_with_ghosts(self):
2026
index = self.two_graph_index()
2027
self.assertEqual((), index.get_parents_with_ghosts('parent'))
2028
# and errors on ghosts.
2029
self.assertRaises(errors.RevisionNotPresent,
2030
index.get_parents_with_ghosts, 'ghost')
2032
def test_check_versions_present(self):
2033
index = self.two_graph_index()
2034
self.assertRaises(RevisionNotPresent, index.check_versions_present,
2036
self.assertRaises(RevisionNotPresent, index.check_versions_present,
2037
['tail', 'missing'])
2038
index.check_versions_present(['tail', 'separate'])
2040
def catch_add(self, entries):
2041
self.caught_entries.append(entries)
2043
def test_add_no_callback_errors(self):
2044
index = self.two_graph_index()
2045
self.assertRaises(errors.ReadOnlyError, index.add_version,
2046
'new', 'fulltext,no-eol', 50, 60, ['separate'])
2048
def test_add_version_smoke(self):
2049
index = self.two_graph_index(catch_adds=True)
2050
index.add_version('new', 'fulltext,no-eol', 50, 60, [])
2051
self.assertEqual([[(('new', ), 'N50 60')]],
2052
self.caught_entries)
2054
def test_add_version_delta_not_delta_index(self):
2055
index = self.two_graph_index(catch_adds=True)
2056
self.assertRaises(errors.KnitCorrupt, index.add_version,
2057
'new', 'no-eol,line-delta', 0, 100, [])
2058
self.assertEqual([], self.caught_entries)
2060
def test_add_version_same_dup(self):
2061
index = self.two_graph_index(catch_adds=True)
2062
# options can be spelt two different ways
2063
index.add_version('tip', 'fulltext,no-eol', 0, 100, [])
2064
index.add_version('tip', 'no-eol,fulltext', 0, 100, [])
2065
# but neither should have added data.
2066
self.assertEqual([[], []], self.caught_entries)
2068
def test_add_version_different_dup(self):
2069
index = self.two_graph_index(catch_adds=True)
2071
self.assertRaises(errors.KnitCorrupt, index.add_version,
2072
'tip', 'no-eol,line-delta', 0, 100, [])
2073
self.assertRaises(errors.KnitCorrupt, index.add_version,
2074
'tip', 'line-delta,no-eol', 0, 100, [])
2075
self.assertRaises(errors.KnitCorrupt, index.add_version,
2076
'tip', 'fulltext', 0, 100, [])
2078
self.assertRaises(errors.KnitCorrupt, index.add_version,
2079
'tip', 'fulltext,no-eol', 50, 100, [])
2080
self.assertRaises(errors.KnitCorrupt, index.add_version,
2081
'tip', 'fulltext,no-eol', 0, 1000, [])
2083
self.assertRaises(errors.KnitCorrupt, index.add_version,
2084
'tip', 'fulltext,no-eol', 0, 100, ['parent'])
2085
self.assertEqual([], self.caught_entries)
2087
def test_add_versions(self):
2088
index = self.two_graph_index(catch_adds=True)
2089
index.add_versions([
2090
('new', 'fulltext,no-eol', 50, 60, []),
2091
('new2', 'fulltext', 0, 6, []),
2093
self.assertEqual([(('new', ), 'N50 60'), (('new2', ), ' 0 6')],
2094
sorted(self.caught_entries[0]))
2095
self.assertEqual(1, len(self.caught_entries))
2097
def test_add_versions_delta_not_delta_index(self):
2098
index = self.two_graph_index(catch_adds=True)
2099
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2100
[('new', 'no-eol,line-delta', 0, 100, ['parent'])])
2101
self.assertEqual([], self.caught_entries)
2103
def test_add_versions_parents_not_parents_index(self):
2104
index = self.two_graph_index(catch_adds=True)
2105
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2106
[('new', 'no-eol,fulltext', 0, 100, ['parent'])])
2107
self.assertEqual([], self.caught_entries)
2109
def test_add_versions_same_dup(self):
2110
index = self.two_graph_index(catch_adds=True)
2111
# options can be spelt two different ways
2112
index.add_versions([('tip', 'fulltext,no-eol', 0, 100, [])])
2113
index.add_versions([('tip', 'no-eol,fulltext', 0, 100, [])])
2114
# but neither should have added data.
2115
self.assertEqual([[], []], self.caught_entries)
2117
def test_add_versions_different_dup(self):
2118
index = self.two_graph_index(catch_adds=True)
2120
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2121
[('tip', 'no-eol,line-delta', 0, 100, [])])
2122
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2123
[('tip', 'line-delta,no-eol', 0, 100, [])])
2124
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2125
[('tip', 'fulltext', 0, 100, [])])
2127
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2128
[('tip', 'fulltext,no-eol', 50, 100, [])])
2129
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2130
[('tip', 'fulltext,no-eol', 0, 1000, [])])
2132
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2133
[('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
2134
# change options in the second record
2135
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2136
[('tip', 'fulltext,no-eol', 0, 100, []),
2137
('tip', 'no-eol,line-delta', 0, 100, [])])
2138
self.assertEqual([], self.caught_entries)
2140
def test_iter_parents(self):
2141
index = self.two_graph_index()
2142
self.assertEqual(set([
2143
('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
2145
set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
2146
self.assertEqual(set([('tip', ())]),
2147
set(index.iter_parents(['tip'])))
2148
self.assertEqual(set(),
2149
set(index.iter_parents([])))