1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
31
from bzrlib.errors import (
32
RevisionAlreadyPresent,
37
from bzrlib.index import *
38
from bzrlib.knit import (
51
from bzrlib.osutils import split_lines
52
from bzrlib.tests import (
55
TestCaseWithMemoryTransport,
56
TestCaseWithTransport,
58
from bzrlib.transport import TransportLogger, get_transport
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.weave import Weave
63
class _CompiledKnitFeature(Feature):
67
import bzrlib._knit_load_data_c
72
def feature_name(self):
73
return 'bzrlib._knit_load_data_c'
75
CompiledKnitFeature = _CompiledKnitFeature()
78
class KnitContentTests(TestCase):
80
def test_constructor(self):
81
content = KnitContent([])
84
content = KnitContent([])
85
self.assertEqual(content.text(), [])
87
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
88
self.assertEqual(content.text(), ["text1", "text2"])
90
def test_annotate(self):
91
content = KnitContent([])
92
self.assertEqual(content.annotate(), [])
94
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
95
self.assertEqual(content.annotate(),
96
[("origin1", "text1"), ("origin2", "text2")])
98
def test_annotate_iter(self):
99
content = KnitContent([])
100
it = content.annotate_iter()
101
self.assertRaises(StopIteration, it.next)
103
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
104
it = content.annotate_iter()
105
self.assertEqual(it.next(), ("origin1", "text1"))
106
self.assertEqual(it.next(), ("origin2", "text2"))
107
self.assertRaises(StopIteration, it.next)
110
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
111
copy = content.copy()
112
self.assertIsInstance(copy, KnitContent)
113
self.assertEqual(copy.annotate(),
114
[("origin1", "text1"), ("origin2", "text2")])
116
def test_line_delta(self):
117
content1 = KnitContent([("", "a"), ("", "b")])
118
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
119
self.assertEqual(content1.line_delta(content2),
120
[(1, 2, 2, [("", "a"), ("", "c")])])
122
def test_line_delta_iter(self):
123
content1 = KnitContent([("", "a"), ("", "b")])
124
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
125
it = content1.line_delta_iter(content2)
126
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
127
self.assertRaises(StopIteration, it.next)
130
class MockTransport(object):
132
def __init__(self, file_lines=None):
133
self.file_lines = file_lines
135
# We have no base directory for the MockTransport
138
def get(self, filename):
139
if self.file_lines is None:
140
raise NoSuchFile(filename)
142
return StringIO("\n".join(self.file_lines))
144
def readv(self, relpath, offsets):
145
fp = self.get(relpath)
146
for offset, size in offsets:
148
yield offset, fp.read(size)
150
def __getattr__(self, name):
151
def queue_call(*args, **kwargs):
152
self.calls.append((name, args, kwargs))
156
class KnitRecordAccessTestsMixin(object):
157
"""Tests for getting and putting knit records."""
159
def assertAccessExists(self, access):
160
"""Ensure the data area for access has been initialised/exists."""
161
raise NotImplementedError(self.assertAccessExists)
163
def test_add_raw_records(self):
164
"""Add_raw_records adds records retrievable later."""
165
access = self.get_access()
166
memos = access.add_raw_records([10], '1234567890')
167
self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
169
def test_add_several_raw_records(self):
170
"""add_raw_records with many records and read some back."""
171
access = self.get_access()
172
memos = access.add_raw_records([10, 2, 5], '12345678901234567')
173
self.assertEqual(['1234567890', '12', '34567'],
174
list(access.get_raw_records(memos)))
175
self.assertEqual(['1234567890'],
176
list(access.get_raw_records(memos[0:1])))
177
self.assertEqual(['12'],
178
list(access.get_raw_records(memos[1:2])))
179
self.assertEqual(['34567'],
180
list(access.get_raw_records(memos[2:3])))
181
self.assertEqual(['1234567890', '34567'],
182
list(access.get_raw_records(memos[0:1] + memos[2:3])))
184
def test_create(self):
185
"""create() should make a file on disk."""
186
access = self.get_access()
188
self.assertAccessExists(access)
190
def test_open_file(self):
191
"""open_file never errors."""
192
access = self.get_access()
196
class TestKnitKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
197
"""Tests for the .kndx implementation."""
199
def assertAccessExists(self, access):
200
self.assertNotEqual(None, access.open_file())
202
def get_access(self):
203
"""Get a .knit style access instance."""
204
access = _KnitAccess(self.get_transport(), "foo.knit", None, None,
209
class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
210
"""Tests for the pack based access."""
212
def assertAccessExists(self, access):
213
# as pack based access has no backing unless an index maps data, this
217
def get_access(self):
218
return self._get_access()[0]
220
def _get_access(self, packname='packfile', index='FOO'):
221
transport = self.get_transport()
222
def write_data(bytes):
223
transport.append_bytes(packname, bytes)
224
writer = pack.ContainerWriter(write_data)
226
indices = {index:(transport, packname)}
227
access = _PackAccess(indices, writer=(writer, index))
228
return access, writer
230
def test_read_from_several_packs(self):
231
access, writer = self._get_access()
233
memos.extend(access.add_raw_records([10], '1234567890'))
235
access, writer = self._get_access('pack2', 'FOOBAR')
236
memos.extend(access.add_raw_records([5], '12345'))
238
access, writer = self._get_access('pack3', 'BAZ')
239
memos.extend(access.add_raw_records([5], 'alpha'))
241
transport = self.get_transport()
242
access = _PackAccess({"FOO":(transport, 'packfile'),
243
"FOOBAR":(transport, 'pack2'),
244
"BAZ":(transport, 'pack3')})
245
self.assertEqual(['1234567890', '12345', 'alpha'],
246
list(access.get_raw_records(memos)))
247
self.assertEqual(['1234567890'],
248
list(access.get_raw_records(memos[0:1])))
249
self.assertEqual(['12345'],
250
list(access.get_raw_records(memos[1:2])))
251
self.assertEqual(['alpha'],
252
list(access.get_raw_records(memos[2:3])))
253
self.assertEqual(['1234567890', 'alpha'],
254
list(access.get_raw_records(memos[0:1] + memos[2:3])))
256
def test_set_writer(self):
257
"""The writer should be settable post construction."""
258
access = _PackAccess({})
259
transport = self.get_transport()
260
packname = 'packfile'
262
def write_data(bytes):
263
transport.append_bytes(packname, bytes)
264
writer = pack.ContainerWriter(write_data)
266
access.set_writer(writer, index, (transport, packname))
267
memos = access.add_raw_records([10], '1234567890')
269
self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
272
class LowLevelKnitDataTests(TestCase):
274
def create_gz_content(self, text):
276
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
279
return sio.getvalue()
281
def test_valid_knit_data(self):
282
sha1sum = sha.new('foo\nbar\n').hexdigest()
283
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
288
transport = MockTransport([gz_txt])
289
access = _KnitAccess(transport, 'filename', None, None, False, False)
290
data = _KnitData(access=access)
291
records = [('rev-id-1', (None, 0, len(gz_txt)))]
293
contents = data.read_records(records)
294
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
296
raw_contents = list(data.read_records_iter_raw(records))
297
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
299
def test_not_enough_lines(self):
300
sha1sum = sha.new('foo\n').hexdigest()
301
# record says 2 lines data says 1
302
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
306
transport = MockTransport([gz_txt])
307
access = _KnitAccess(transport, 'filename', None, None, False, False)
308
data = _KnitData(access=access)
309
records = [('rev-id-1', (None, 0, len(gz_txt)))]
310
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
312
# read_records_iter_raw won't detect that sort of mismatch/corruption
313
raw_contents = list(data.read_records_iter_raw(records))
314
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
316
def test_too_many_lines(self):
317
sha1sum = sha.new('foo\nbar\n').hexdigest()
318
# record says 1 lines data says 2
319
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
324
transport = MockTransport([gz_txt])
325
access = _KnitAccess(transport, 'filename', None, None, False, False)
326
data = _KnitData(access=access)
327
records = [('rev-id-1', (None, 0, len(gz_txt)))]
328
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
330
# read_records_iter_raw won't detect that sort of mismatch/corruption
331
raw_contents = list(data.read_records_iter_raw(records))
332
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
334
def test_mismatched_version_id(self):
335
sha1sum = sha.new('foo\nbar\n').hexdigest()
336
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
341
transport = MockTransport([gz_txt])
342
access = _KnitAccess(transport, 'filename', None, None, False, False)
343
data = _KnitData(access=access)
344
# We are asking for rev-id-2, but the data is rev-id-1
345
records = [('rev-id-2', (None, 0, len(gz_txt)))]
346
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
348
# read_records_iter_raw will notice if we request the wrong version.
349
self.assertRaises(errors.KnitCorrupt, list,
350
data.read_records_iter_raw(records))
352
def test_uncompressed_data(self):
353
sha1sum = sha.new('foo\nbar\n').hexdigest()
354
txt = ('version rev-id-1 2 %s\n'
359
transport = MockTransport([txt])
360
access = _KnitAccess(transport, 'filename', None, None, False, False)
361
data = _KnitData(access=access)
362
records = [('rev-id-1', (None, 0, len(txt)))]
364
# We don't have valid gzip data ==> corrupt
365
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
367
# read_records_iter_raw will notice the bad data
368
self.assertRaises(errors.KnitCorrupt, list,
369
data.read_records_iter_raw(records))
371
def test_corrupted_data(self):
372
sha1sum = sha.new('foo\nbar\n').hexdigest()
373
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
378
# Change 2 bytes in the middle to \xff
379
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
380
transport = MockTransport([gz_txt])
381
access = _KnitAccess(transport, 'filename', None, None, False, False)
382
data = _KnitData(access=access)
383
records = [('rev-id-1', (None, 0, len(gz_txt)))]
385
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
387
# read_records_iter_raw will notice if we request the wrong version.
388
self.assertRaises(errors.KnitCorrupt, list,
389
data.read_records_iter_raw(records))
392
class LowLevelKnitIndexTests(TestCase):
394
def get_knit_index(self, *args, **kwargs):
395
orig = knit._load_data
397
knit._load_data = orig
398
self.addCleanup(reset)
399
from bzrlib._knit_load_data_py import _load_data_py
400
knit._load_data = _load_data_py
401
return _KnitIndex(*args, **kwargs)
403
def test_no_such_file(self):
404
transport = MockTransport()
406
self.assertRaises(NoSuchFile, self.get_knit_index,
407
transport, "filename", "r")
408
self.assertRaises(NoSuchFile, self.get_knit_index,
409
transport, "filename", "w", create=False)
411
def test_create_file(self):
412
transport = MockTransport()
414
index = self.get_knit_index(transport, "filename", "w",
415
file_mode="wb", create=True)
417
("put_bytes_non_atomic",
418
("filename", index.HEADER), {"mode": "wb"}),
419
transport.calls.pop(0))
421
def test_delay_create_file(self):
422
transport = MockTransport()
424
index = self.get_knit_index(transport, "filename", "w",
425
create=True, file_mode="wb", create_parent_dir=True,
426
delay_create=True, dir_mode=0777)
427
self.assertEqual([], transport.calls)
429
index.add_versions([])
430
name, (filename, f), kwargs = transport.calls.pop(0)
431
self.assertEqual("put_file_non_atomic", name)
433
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
435
self.assertEqual("filename", filename)
436
self.assertEqual(index.HEADER, f.read())
438
index.add_versions([])
439
self.assertEqual(("append_bytes", ("filename", ""), {}),
440
transport.calls.pop(0))
442
def test_read_utf8_version_id(self):
443
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
444
utf8_revision_id = unicode_revision_id.encode('utf-8')
445
transport = MockTransport([
447
'%s option 0 1 :' % (utf8_revision_id,)
449
index = self.get_knit_index(transport, "filename", "r")
450
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
451
# Unicode revision_ids.
452
self.assertTrue(index.has_version(utf8_revision_id))
453
self.assertFalse(index.has_version(unicode_revision_id))
455
def test_read_utf8_parents(self):
456
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
457
utf8_revision_id = unicode_revision_id.encode('utf-8')
458
transport = MockTransport([
460
"version option 0 1 .%s :" % (utf8_revision_id,)
462
index = self.get_knit_index(transport, "filename", "r")
463
self.assertEqual([utf8_revision_id],
464
index.get_parents_with_ghosts("version"))
466
def test_read_ignore_corrupted_lines(self):
467
transport = MockTransport([
470
"corrupted options 0 1 .b .c ",
471
"version options 0 1 :"
473
index = self.get_knit_index(transport, "filename", "r")
474
self.assertEqual(1, index.num_versions())
475
self.assertTrue(index.has_version("version"))
477
def test_read_corrupted_header(self):
478
transport = MockTransport(['not a bzr knit index header\n'])
479
self.assertRaises(KnitHeaderError,
480
self.get_knit_index, transport, "filename", "r")
482
def test_read_duplicate_entries(self):
483
transport = MockTransport([
485
"parent options 0 1 :",
486
"version options1 0 1 0 :",
487
"version options2 1 2 .other :",
488
"version options3 3 4 0 .other :"
490
index = self.get_knit_index(transport, "filename", "r")
491
self.assertEqual(2, index.num_versions())
492
# check that the index used is the first one written. (Specific
493
# to KnitIndex style indices.
494
self.assertEqual("1", index._version_list_to_index(["version"]))
495
self.assertEqual((None, 3, 4), index.get_position("version"))
496
self.assertEqual(["options3"], index.get_options("version"))
497
self.assertEqual(["parent", "other"],
498
index.get_parents_with_ghosts("version"))
500
def test_read_compressed_parents(self):
501
transport = MockTransport([
505
"c option 0 1 1 0 :",
507
index = self.get_knit_index(transport, "filename", "r")
508
self.assertEqual(["a"], index.get_parents("b"))
509
self.assertEqual(["b", "a"], index.get_parents("c"))
511
def test_write_utf8_version_id(self):
512
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
513
utf8_revision_id = unicode_revision_id.encode('utf-8')
514
transport = MockTransport([
517
index = self.get_knit_index(transport, "filename", "r")
518
index.add_version(utf8_revision_id, ["option"], (None, 0, 1), [])
519
self.assertEqual(("append_bytes", ("filename",
520
"\n%s option 0 1 :" % (utf8_revision_id,)),
522
transport.calls.pop(0))
524
def test_write_utf8_parents(self):
525
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
526
utf8_revision_id = unicode_revision_id.encode('utf-8')
527
transport = MockTransport([
530
index = self.get_knit_index(transport, "filename", "r")
531
index.add_version("version", ["option"], (None, 0, 1), [utf8_revision_id])
532
self.assertEqual(("append_bytes", ("filename",
533
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
535
transport.calls.pop(0))
537
def test_get_graph(self):
538
transport = MockTransport()
539
index = self.get_knit_index(transport, "filename", "w", create=True)
540
self.assertEqual([], index.get_graph())
542
index.add_version("a", ["option"], (None, 0, 1), ["b"])
543
self.assertEqual([("a", ["b"])], index.get_graph())
545
index.add_version("c", ["option"], (None, 0, 1), ["d"])
546
self.assertEqual([("a", ["b"]), ("c", ["d"])],
547
sorted(index.get_graph()))
549
def test_get_ancestry(self):
550
transport = MockTransport([
553
"b option 0 1 0 .e :",
554
"c option 0 1 1 0 :",
555
"d option 0 1 2 .f :"
557
index = self.get_knit_index(transport, "filename", "r")
559
self.assertEqual([], index.get_ancestry([]))
560
self.assertEqual(["a"], index.get_ancestry(["a"]))
561
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
562
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
563
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
564
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
565
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
567
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
569
def test_get_ancestry_with_ghosts(self):
570
transport = MockTransport([
573
"b option 0 1 0 .e :",
574
"c option 0 1 0 .f .g :",
575
"d option 0 1 2 .h .j .k :"
577
index = self.get_knit_index(transport, "filename", "r")
579
self.assertEqual([], index.get_ancestry_with_ghosts([]))
580
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
581
self.assertEqual(["a", "e", "b"],
582
index.get_ancestry_with_ghosts(["b"]))
583
self.assertEqual(["a", "g", "f", "c"],
584
index.get_ancestry_with_ghosts(["c"]))
585
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
586
index.get_ancestry_with_ghosts(["d"]))
587
self.assertEqual(["a", "e", "b"],
588
index.get_ancestry_with_ghosts(["a", "b"]))
589
self.assertEqual(["a", "g", "f", "c"],
590
index.get_ancestry_with_ghosts(["a", "c"]))
592
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
593
index.get_ancestry_with_ghosts(["b", "d"]))
595
self.assertRaises(RevisionNotPresent,
596
index.get_ancestry_with_ghosts, ["e"])
598
def test_iter_parents(self):
599
transport = MockTransport()
600
index = self.get_knit_index(transport, "filename", "w", create=True)
602
index.add_version('r0', ['option'], (None, 0, 1), [])
604
index.add_version('r1', ['option'], (None, 0, 1), ['r0'])
606
index.add_version('r2', ['option'], (None, 0, 1), ['r1', 'r0'])
608
# cases: each sample data individually:
609
self.assertEqual(set([('r0', ())]),
610
set(index.iter_parents(['r0'])))
611
self.assertEqual(set([('r1', ('r0', ))]),
612
set(index.iter_parents(['r1'])))
613
self.assertEqual(set([('r2', ('r1', 'r0'))]),
614
set(index.iter_parents(['r2'])))
615
# no nodes returned for a missing node
616
self.assertEqual(set(),
617
set(index.iter_parents(['missing'])))
618
# 1 node returned with missing nodes skipped
619
self.assertEqual(set([('r1', ('r0', ))]),
620
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
622
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
623
set(index.iter_parents(['r0', 'r1'])))
624
# 2 nodes returned, missing skipped
625
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
626
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
628
def test_num_versions(self):
629
transport = MockTransport([
632
index = self.get_knit_index(transport, "filename", "r")
634
self.assertEqual(0, index.num_versions())
635
self.assertEqual(0, len(index))
637
index.add_version("a", ["option"], (None, 0, 1), [])
638
self.assertEqual(1, index.num_versions())
639
self.assertEqual(1, len(index))
641
index.add_version("a", ["option2"], (None, 1, 2), [])
642
self.assertEqual(1, index.num_versions())
643
self.assertEqual(1, len(index))
645
index.add_version("b", ["option"], (None, 0, 1), [])
646
self.assertEqual(2, index.num_versions())
647
self.assertEqual(2, len(index))
649
def test_get_versions(self):
650
transport = MockTransport([
653
index = self.get_knit_index(transport, "filename", "r")
655
self.assertEqual([], index.get_versions())
657
index.add_version("a", ["option"], (None, 0, 1), [])
658
self.assertEqual(["a"], index.get_versions())
660
index.add_version("a", ["option"], (None, 0, 1), [])
661
self.assertEqual(["a"], index.get_versions())
663
index.add_version("b", ["option"], (None, 0, 1), [])
664
self.assertEqual(["a", "b"], index.get_versions())
666
def test_add_version(self):
667
transport = MockTransport([
670
index = self.get_knit_index(transport, "filename", "r")
672
index.add_version("a", ["option"], (None, 0, 1), ["b"])
673
self.assertEqual(("append_bytes",
674
("filename", "\na option 0 1 .b :"),
675
{}), transport.calls.pop(0))
676
self.assertTrue(index.has_version("a"))
677
self.assertEqual(1, index.num_versions())
678
self.assertEqual((None, 0, 1), index.get_position("a"))
679
self.assertEqual(["option"], index.get_options("a"))
680
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
682
index.add_version("a", ["opt"], (None, 1, 2), ["c"])
683
self.assertEqual(("append_bytes",
684
("filename", "\na opt 1 2 .c :"),
685
{}), transport.calls.pop(0))
686
self.assertTrue(index.has_version("a"))
687
self.assertEqual(1, index.num_versions())
688
self.assertEqual((None, 1, 2), index.get_position("a"))
689
self.assertEqual(["opt"], index.get_options("a"))
690
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
692
index.add_version("b", ["option"], (None, 2, 3), ["a"])
693
self.assertEqual(("append_bytes",
694
("filename", "\nb option 2 3 0 :"),
695
{}), transport.calls.pop(0))
696
self.assertTrue(index.has_version("b"))
697
self.assertEqual(2, index.num_versions())
698
self.assertEqual((None, 2, 3), index.get_position("b"))
699
self.assertEqual(["option"], index.get_options("b"))
700
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
702
def test_add_versions(self):
703
transport = MockTransport([
706
index = self.get_knit_index(transport, "filename", "r")
709
("a", ["option"], (None, 0, 1), ["b"]),
710
("a", ["opt"], (None, 1, 2), ["c"]),
711
("b", ["option"], (None, 2, 3), ["a"])
713
self.assertEqual(("append_bytes", ("filename",
714
"\na option 0 1 .b :"
717
), {}), transport.calls.pop(0))
718
self.assertTrue(index.has_version("a"))
719
self.assertTrue(index.has_version("b"))
720
self.assertEqual(2, index.num_versions())
721
self.assertEqual((None, 1, 2), index.get_position("a"))
722
self.assertEqual((None, 2, 3), index.get_position("b"))
723
self.assertEqual(["opt"], index.get_options("a"))
724
self.assertEqual(["option"], index.get_options("b"))
725
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
726
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
728
def test_delay_create_and_add_versions(self):
729
transport = MockTransport()
731
index = self.get_knit_index(transport, "filename", "w",
732
create=True, file_mode="wb", create_parent_dir=True,
733
delay_create=True, dir_mode=0777)
734
self.assertEqual([], transport.calls)
737
("a", ["option"], (None, 0, 1), ["b"]),
738
("a", ["opt"], (None, 1, 2), ["c"]),
739
("b", ["option"], (None, 2, 3), ["a"])
741
name, (filename, f), kwargs = transport.calls.pop(0)
742
self.assertEqual("put_file_non_atomic", name)
744
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
746
self.assertEqual("filename", filename)
749
"\na option 0 1 .b :"
751
"\nb option 2 3 0 :",
754
def test_has_version(self):
755
transport = MockTransport([
759
index = self.get_knit_index(transport, "filename", "r")
761
self.assertTrue(index.has_version("a"))
762
self.assertFalse(index.has_version("b"))
764
def test_get_position(self):
765
transport = MockTransport([
770
index = self.get_knit_index(transport, "filename", "r")
772
self.assertEqual((None, 0, 1), index.get_position("a"))
773
self.assertEqual((None, 1, 2), index.get_position("b"))
775
def test_get_method(self):
776
transport = MockTransport([
778
"a fulltext,unknown 0 1 :",
779
"b unknown,line-delta 1 2 :",
782
index = self.get_knit_index(transport, "filename", "r")
784
self.assertEqual("fulltext", index.get_method("a"))
785
self.assertEqual("line-delta", index.get_method("b"))
786
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
788
def test_get_options(self):
789
transport = MockTransport([
794
index = self.get_knit_index(transport, "filename", "r")
796
self.assertEqual(["opt1"], index.get_options("a"))
797
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
799
def test_get_parents(self):
800
transport = MockTransport([
803
"b option 1 2 0 .c :",
804
"c option 1 2 1 0 .e :"
806
index = self.get_knit_index(transport, "filename", "r")
808
self.assertEqual([], index.get_parents("a"))
809
self.assertEqual(["a", "c"], index.get_parents("b"))
810
self.assertEqual(["b", "a"], index.get_parents("c"))
812
def test_get_parents_with_ghosts(self):
813
transport = MockTransport([
816
"b option 1 2 0 .c :",
817
"c option 1 2 1 0 .e :"
819
index = self.get_knit_index(transport, "filename", "r")
821
self.assertEqual([], index.get_parents_with_ghosts("a"))
822
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
823
self.assertEqual(["b", "a", "e"],
824
index.get_parents_with_ghosts("c"))
826
def test_check_versions_present(self):
827
transport = MockTransport([
832
index = self.get_knit_index(transport, "filename", "r")
834
check = index.check_versions_present
840
self.assertRaises(RevisionNotPresent, check, ["c"])
841
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
843
def test_impossible_parent(self):
844
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
845
transport = MockTransport([
848
"b option 0 1 4 :" # We don't have a 4th record
851
self.assertRaises(errors.KnitCorrupt,
852
self.get_knit_index, transport, 'filename', 'r')
854
if (str(e) == ('exceptions must be strings, classes, or instances,'
855
' not exceptions.IndexError')
856
and sys.version_info[0:2] >= (2,5)):
857
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
858
' raising new style exceptions with python'
863
def test_corrupted_parent(self):
864
transport = MockTransport([
868
"c option 0 1 1v :", # Can't have a parent of '1v'
871
self.assertRaises(errors.KnitCorrupt,
872
self.get_knit_index, transport, 'filename', 'r')
874
if (str(e) == ('exceptions must be strings, classes, or instances,'
875
' not exceptions.ValueError')
876
and sys.version_info[0:2] >= (2,5)):
877
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
878
' raising new style exceptions with python'
883
def test_corrupted_parent_in_list(self):
884
transport = MockTransport([
888
"c option 0 1 1 v :", # Can't have a parent of 'v'
891
self.assertRaises(errors.KnitCorrupt,
892
self.get_knit_index, transport, 'filename', 'r')
894
if (str(e) == ('exceptions must be strings, classes, or instances,'
895
' not exceptions.ValueError')
896
and sys.version_info[0:2] >= (2,5)):
897
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
898
' raising new style exceptions with python'
903
def test_invalid_position(self):
904
transport = MockTransport([
909
self.assertRaises(errors.KnitCorrupt,
910
self.get_knit_index, transport, 'filename', 'r')
912
if (str(e) == ('exceptions must be strings, classes, or instances,'
913
' not exceptions.ValueError')
914
and sys.version_info[0:2] >= (2,5)):
915
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
916
' raising new style exceptions with python'
921
def test_invalid_size(self):
922
transport = MockTransport([
927
self.assertRaises(errors.KnitCorrupt,
928
self.get_knit_index, transport, 'filename', 'r')
930
if (str(e) == ('exceptions must be strings, classes, or instances,'
931
' not exceptions.ValueError')
932
and sys.version_info[0:2] >= (2,5)):
933
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
934
' raising new style exceptions with python'
939
def test_short_line(self):
940
transport = MockTransport([
943
"b option 10 10 0", # This line isn't terminated, ignored
945
index = self.get_knit_index(transport, "filename", "r")
946
self.assertEqual(['a'], index.get_versions())
948
def test_skip_incomplete_record(self):
949
# A line with bogus data should just be skipped
950
transport = MockTransport([
953
"b option 10 10 0", # This line isn't terminated, ignored
954
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
956
index = self.get_knit_index(transport, "filename", "r")
957
self.assertEqual(['a', 'c'], index.get_versions())
959
def test_trailing_characters(self):
960
# A line with bogus data should just be skipped
961
transport = MockTransport([
964
"b option 10 10 0 :a", # This line has extra trailing characters
965
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
967
index = self.get_knit_index(transport, "filename", "r")
968
self.assertEqual(['a', 'c'], index.get_versions())
971
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
973
_test_needs_features = [CompiledKnitFeature]
975
def get_knit_index(self, *args, **kwargs):
976
orig = knit._load_data
978
knit._load_data = orig
979
self.addCleanup(reset)
980
from bzrlib._knit_load_data_c import _load_data_c
981
knit._load_data = _load_data_c
982
return _KnitIndex(*args, **kwargs)
986
class KnitTests(TestCaseWithTransport):
987
"""Class containing knit test helper routines."""
989
def make_test_knit(self, annotate=False, delay_create=False, index=None):
991
factory = KnitPlainFactory()
994
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
995
factory=factory, create=True,
996
delay_create=delay_create, index=index)
999
class BasicKnitTests(KnitTests):
1001
def add_stock_one_and_one_a(self, k):
1002
k.add_lines('text-1', [], split_lines(TEXT_1))
1003
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
1005
def test_knit_constructor(self):
1006
"""Construct empty k"""
1007
self.make_test_knit()
1009
def test_make_explicit_index(self):
1010
"""We can supply an index to use."""
1011
knit = KnitVersionedFile('test', get_transport('.'),
1012
index='strangelove')
1013
self.assertEqual(knit._index, 'strangelove')
1015
def test_knit_add(self):
1016
"""Store one text in knit and retrieve"""
1017
k = self.make_test_knit()
1018
k.add_lines('text-1', [], split_lines(TEXT_1))
1019
self.assertTrue(k.has_version('text-1'))
1020
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
1022
def test_knit_reload(self):
1023
# test that the content in a reloaded knit is correct
1024
k = self.make_test_knit()
1025
k.add_lines('text-1', [], split_lines(TEXT_1))
1027
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
1028
self.assertTrue(k2.has_version('text-1'))
1029
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
1031
def test_knit_several(self):
1032
"""Store several texts in a knit"""
1033
k = self.make_test_knit()
1034
k.add_lines('text-1', [], split_lines(TEXT_1))
1035
k.add_lines('text-2', [], split_lines(TEXT_2))
1036
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
1037
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
1039
def test_repeated_add(self):
1040
"""Knit traps attempt to replace existing version"""
1041
k = self.make_test_knit()
1042
k.add_lines('text-1', [], split_lines(TEXT_1))
1043
self.assertRaises(RevisionAlreadyPresent,
1045
'text-1', [], split_lines(TEXT_1))
1047
def test_empty(self):
1048
k = self.make_test_knit(True)
1049
k.add_lines('text-1', [], [])
1050
self.assertEquals(k.get_lines('text-1'), [])
1052
def test_incomplete(self):
1053
"""Test if texts without a ending line-end can be inserted and
1055
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
1056
k.add_lines('text-1', [], ['a\n', 'b' ])
1057
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
1058
# reopening ensures maximum room for confusion
1059
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
1060
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
1061
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
1063
def test_delta(self):
1064
"""Expression of knit delta as lines"""
1065
k = self.make_test_knit()
1067
td = list(line_delta(TEXT_1.splitlines(True),
1068
TEXT_1A.splitlines(True)))
1069
self.assertEqualDiff(''.join(td), delta_1_1a)
1070
out = apply_line_delta(TEXT_1.splitlines(True), td)
1071
self.assertEqualDiff(''.join(out), TEXT_1A)
1073
def assertDerivedBlocksEqual(self, source, target, noeol=False):
1074
"""Assert that the derived matching blocks match real output"""
1075
source_lines = source.splitlines(True)
1076
target_lines = target.splitlines(True)
1078
if noeol and not line.endswith('\n'):
1082
source_content = KnitContent([(None, nl(l)) for l in source_lines])
1083
target_content = KnitContent([(None, nl(l)) for l in target_lines])
1084
line_delta = source_content.line_delta(target_content)
1085
delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
1086
source_lines, target_lines))
1087
matcher = KnitSequenceMatcher(None, source_lines, target_lines)
1088
matcher_blocks = list(list(matcher.get_matching_blocks()))
1089
self.assertEqual(matcher_blocks, delta_blocks)
1091
def test_get_line_delta_blocks(self):
1092
self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
1093
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
1094
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
1095
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
1096
self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
1097
self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
1098
self.assertDerivedBlocksEqual(TEXT_1A, '')
1099
self.assertDerivedBlocksEqual('', TEXT_1A)
1100
self.assertDerivedBlocksEqual('', '')
1101
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
1103
def test_get_line_delta_blocks_noeol(self):
1104
"""Handle historical knit deltas safely
1106
Some existing knit deltas don't consider the last line to differ
1107
when the only difference whether it has a final newline.
1109
New knit deltas appear to always consider the last line to differ
1112
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
1113
self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
1114
self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
1115
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
1117
def test_add_with_parents(self):
1118
"""Store in knit with parents"""
1119
k = self.make_test_knit()
1120
self.add_stock_one_and_one_a(k)
1121
self.assertEquals(k.get_parents('text-1'), [])
1122
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
1124
def test_ancestry(self):
1125
"""Store in knit with parents"""
1126
k = self.make_test_knit()
1127
self.add_stock_one_and_one_a(k)
1128
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
1130
def test_add_delta(self):
1131
"""Store in knit with parents"""
1132
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
1133
delta=True, create=True)
1134
self.add_stock_one_and_one_a(k)
1136
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1138
def test_add_delta_knit_graph_index(self):
1139
"""Does adding work with a KnitGraphIndex."""
1140
index = InMemoryGraphIndex(2)
1141
knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
1143
k = KnitVersionedFile('test', get_transport('.'),
1144
delta=True, create=True, index=knit_index)
1145
self.add_stock_one_and_one_a(k)
1147
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1148
# check the index had the right data added.
1149
self.assertEqual(set([
1150
(index, ('text-1', ), ' 0 127', ((), ())),
1151
(index, ('text-1a', ), ' 127 140', ((('text-1', ),), (('text-1', ),))),
1152
]), set(index.iter_all_entries()))
1153
# we should not have a .kndx file
1154
self.assertFalse(get_transport('.').has('test.kndx'))
1156
def test_annotate(self):
1158
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1159
delta=True, create=True)
1160
self.insert_and_test_small_annotate(k)
1162
def insert_and_test_small_annotate(self, k):
1163
"""test annotation with k works correctly."""
1164
k.add_lines('text-1', [], ['a\n', 'b\n'])
1165
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
1167
origins = k.annotate('text-2')
1168
self.assertEquals(origins[0], ('text-1', 'a\n'))
1169
self.assertEquals(origins[1], ('text-2', 'c\n'))
1171
def test_annotate_fulltext(self):
1173
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1174
delta=False, create=True)
1175
self.insert_and_test_small_annotate(k)
1177
def test_annotate_merge_1(self):
1178
k = self.make_test_knit(True)
1179
k.add_lines('text-a1', [], ['a\n', 'b\n'])
1180
k.add_lines('text-a2', [], ['d\n', 'c\n'])
1181
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
1182
origins = k.annotate('text-am')
1183
self.assertEquals(origins[0], ('text-a2', 'd\n'))
1184
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1186
def test_annotate_merge_2(self):
1187
k = self.make_test_knit(True)
1188
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1189
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1190
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
1191
origins = k.annotate('text-am')
1192
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1193
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1194
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1196
def test_annotate_merge_9(self):
1197
k = self.make_test_knit(True)
1198
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1199
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1200
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
1201
origins = k.annotate('text-am')
1202
self.assertEquals(origins[0], ('text-am', 'k\n'))
1203
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1204
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1206
def test_annotate_merge_3(self):
1207
k = self.make_test_knit(True)
1208
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1209
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
1210
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
1211
origins = k.annotate('text-am')
1212
self.assertEquals(origins[0], ('text-am', 'k\n'))
1213
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1214
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1216
def test_annotate_merge_4(self):
1217
k = self.make_test_knit(True)
1218
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1219
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1220
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
1221
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
1222
origins = k.annotate('text-am')
1223
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1224
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1225
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1227
def test_annotate_merge_5(self):
1228
k = self.make_test_knit(True)
1229
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1230
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
1231
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
1232
k.add_lines('text-am',
1233
['text-a1', 'text-a2', 'text-a3'],
1234
['a\n', 'e\n', 'z\n'])
1235
origins = k.annotate('text-am')
1236
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1237
self.assertEquals(origins[1], ('text-a2', 'e\n'))
1238
self.assertEquals(origins[2], ('text-a3', 'z\n'))
1240
def test_annotate_file_cherry_pick(self):
1241
k = self.make_test_knit(True)
1242
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
1243
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
1244
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
1245
origins = k.annotate('text-3')
1246
self.assertEquals(origins[0], ('text-1', 'a\n'))
1247
self.assertEquals(origins[1], ('text-1', 'b\n'))
1248
self.assertEquals(origins[2], ('text-1', 'c\n'))
1250
def test_knit_join(self):
1251
"""Store in knit with parents"""
1252
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
1253
k1.add_lines('text-a', [], split_lines(TEXT_1))
1254
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1256
k1.add_lines('text-c', [], split_lines(TEXT_1))
1257
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
1259
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
1261
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
1262
count = k2.join(k1, version_ids=['text-m'])
1263
self.assertEquals(count, 5)
1264
self.assertTrue(k2.has_version('text-a'))
1265
self.assertTrue(k2.has_version('text-c'))
1267
def test_reannotate(self):
1268
k1 = KnitVersionedFile('knit1', get_transport('.'),
1269
factory=KnitAnnotateFactory(), create=True)
1271
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1273
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1275
k2 = KnitVersionedFile('test2', get_transport('.'),
1276
factory=KnitAnnotateFactory(), create=True)
1277
k2.join(k1, version_ids=['text-b'])
1280
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
1282
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
1284
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
1286
# test-c will have index 3
1287
k1.join(k2, version_ids=['text-c'])
1289
lines = k1.get_lines('text-c')
1290
self.assertEquals(lines, ['z\n', 'c\n'])
1292
origins = k1.annotate('text-c')
1293
self.assertEquals(origins[0], ('text-c', 'z\n'))
1294
self.assertEquals(origins[1], ('text-b', 'c\n'))
1296
def test_get_line_delta_texts(self):
1297
"""Make sure we can call get_texts on text with reused line deltas"""
1298
k1 = KnitVersionedFile('test1', get_transport('.'),
1299
factory=KnitPlainFactory(), create=True)
1304
parents = ['%d' % (t-1)]
1305
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
1306
k1.get_texts(('%d' % t) for t in range(3))
1308
def test_iter_lines_reads_in_order(self):
1309
t = MemoryTransport()
1310
instrumented_t = TransportLogger(t)
1311
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1312
self.assertEqual([('id.kndx',)], instrumented_t._calls)
1313
# add texts with no required ordering
1314
k1.add_lines('base', [], ['text\n'])
1315
k1.add_lines('base2', [], ['text2\n'])
1317
instrumented_t._calls = []
1318
# request a last-first iteration
1319
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
1320
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
1321
self.assertEqual(['text\n', 'text2\n'], results)
1323
def test_create_empty_annotated(self):
1324
k1 = self.make_test_knit(True)
1326
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1327
k2 = k1.create_empty('t', MemoryTransport())
1328
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
1329
self.assertEqual(k1.delta, k2.delta)
1330
# the generic test checks for empty content and file class
1332
def test_knit_format(self):
1333
# this tests that a new knit index file has the expected content
1334
# and that is writes the data we expect as records are added.
1335
knit = self.make_test_knit(True)
1336
# Now knit files are not created until we first add data to them
1337
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
1338
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1339
self.assertFileEqual(
1340
"# bzr knit index 8\n"
1342
"revid fulltext 0 84 .a_ghost :",
1344
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
1345
self.assertFileEqual(
1346
"# bzr knit index 8\n"
1347
"\nrevid fulltext 0 84 .a_ghost :"
1348
"\nrevid2 line-delta 84 82 0 :",
1350
# we should be able to load this file again
1351
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1352
self.assertEqual(['revid', 'revid2'], knit.versions())
1353
# write a short write to the file and ensure that its ignored
1354
indexfile = file('test.kndx', 'ab')
1355
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1357
# we should be able to load this file again
1358
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1359
self.assertEqual(['revid', 'revid2'], knit.versions())
1360
# and add a revision with the same id the failed write had
1361
knit.add_lines('revid3', ['revid2'], ['a\n'])
1362
# and when reading it revid3 should now appear.
1363
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1364
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1365
self.assertEqual(['revid2'], knit.get_parents('revid3'))
1367
def test_delay_create(self):
1368
"""Test that passing delay_create=True creates files late"""
1369
knit = self.make_test_knit(annotate=True, delay_create=True)
1370
self.failIfExists('test.knit')
1371
self.failIfExists('test.kndx')
1372
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1373
self.failUnlessExists('test.knit')
1374
self.assertFileEqual(
1375
"# bzr knit index 8\n"
1377
"revid fulltext 0 84 .a_ghost :",
1380
def test_create_parent_dir(self):
1381
"""create_parent_dir can create knits in nonexistant dirs"""
1382
# Has no effect if we don't set 'delay_create'
1383
trans = get_transport('.')
1384
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1385
trans, access_mode='w', factory=None,
1386
create=True, create_parent_dir=True)
1387
# Nothing should have changed yet
1388
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1389
factory=None, create=True,
1390
create_parent_dir=True,
1392
self.failIfExists('dir/test.knit')
1393
self.failIfExists('dir/test.kndx')
1394
self.failIfExists('dir')
1395
knit.add_lines('revid', [], ['a\n'])
1396
self.failUnlessExists('dir')
1397
self.failUnlessExists('dir/test.knit')
1398
self.assertFileEqual(
1399
"# bzr knit index 8\n"
1401
"revid fulltext 0 84 :",
1404
def test_create_mode_700(self):
1405
trans = get_transport('.')
1406
if not trans._can_roundtrip_unix_modebits():
1407
# Can't roundtrip, so no need to run this test
1409
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1410
factory=None, create=True,
1411
create_parent_dir=True,
1415
knit.add_lines('revid', [], ['a\n'])
1416
self.assertTransportMode(trans, 'dir', 0700)
1417
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1418
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
1420
def test_create_mode_770(self):
1421
trans = get_transport('.')
1422
if not trans._can_roundtrip_unix_modebits():
1423
# Can't roundtrip, so no need to run this test
1425
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1426
factory=None, create=True,
1427
create_parent_dir=True,
1431
knit.add_lines('revid', [], ['a\n'])
1432
self.assertTransportMode(trans, 'dir', 0770)
1433
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1434
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
1436
def test_create_mode_777(self):
1437
trans = get_transport('.')
1438
if not trans._can_roundtrip_unix_modebits():
1439
# Can't roundtrip, so no need to run this test
1441
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1442
factory=None, create=True,
1443
create_parent_dir=True,
1447
knit.add_lines('revid', [], ['a\n'])
1448
self.assertTransportMode(trans, 'dir', 0777)
1449
self.assertTransportMode(trans, 'dir/test.knit', 0666)
1450
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
1452
def test_plan_merge(self):
1453
my_knit = self.make_test_knit(annotate=True)
1454
my_knit.add_lines('text1', [], split_lines(TEXT_1))
1455
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
1456
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
1457
plan = list(my_knit.plan_merge('text1a', 'text1b'))
1458
for plan_line, expected_line in zip(plan, AB_MERGE):
1459
self.assertEqual(plan_line, expected_line)
1471
Banana cup cake recipe
1477
- self-raising flour
1481
Banana cup cake recipe
1483
- bananas (do not use plantains!!!)
1490
Banana cup cake recipe
1493
- self-raising flour
1506
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1511
new-b|- bananas (do not use plantains!!!)
1512
unchanged|- broken tea cups
1513
new-a|- self-raising flour
1516
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1519
def line_delta(from_lines, to_lines):
1520
"""Generate line-based delta from one text to another"""
1521
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1522
for op in s.get_opcodes():
1523
if op[0] == 'equal':
1525
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1526
for i in range(op[3], op[4]):
1530
def apply_line_delta(basis_lines, delta_lines):
1531
"""Apply a line-based perfect diff
1533
basis_lines -- text to apply the patch to
1534
delta_lines -- diff instructions and content
1536
out = basis_lines[:]
1539
while i < len(delta_lines):
1541
a, b, c = map(long, l.split(','))
1543
out[offset+a:offset+b] = delta_lines[i:i+c]
1545
offset = offset + (b - a) + c
1549
class TestWeaveToKnit(KnitTests):
1551
def test_weave_to_knit_matches(self):
1552
# check that the WeaveToKnit is_compatible function
1553
# registers True for a Weave to a Knit.
1555
k = self.make_test_knit()
1556
self.failUnless(WeaveToKnit.is_compatible(w, k))
1557
self.failIf(WeaveToKnit.is_compatible(k, w))
1558
self.failIf(WeaveToKnit.is_compatible(w, w))
1559
self.failIf(WeaveToKnit.is_compatible(k, k))
1562
class TestKnitCaching(KnitTests):
1564
def create_knit(self, cache_add=False):
1565
k = self.make_test_knit(True)
1569
k.add_lines('text-1', [], split_lines(TEXT_1))
1570
k.add_lines('text-2', [], split_lines(TEXT_2))
1573
def test_no_caching(self):
1574
k = self.create_knit()
1575
# Nothing should be cached without setting 'enable_cache'
1576
self.assertEqual({}, k._data._cache)
1578
def test_cache_add_and_clear(self):
1579
k = self.create_knit(True)
1581
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1584
self.assertEqual({}, k._data._cache)
1586
def test_cache_data_read_raw(self):
1587
k = self.create_knit()
1589
# Now cache and read
1592
def read_one_raw(version):
1593
pos_map = k._get_components_positions([version])
1594
method, index_memo, next = pos_map[version]
1595
lst = list(k._data.read_records_iter_raw([(version, index_memo)]))
1596
self.assertEqual(1, len(lst))
1599
val = read_one_raw('text-1')
1600
self.assertEqual({'text-1':val[1]}, k._data._cache)
1603
# After clear, new reads are not cached
1604
self.assertEqual({}, k._data._cache)
1606
val2 = read_one_raw('text-1')
1607
self.assertEqual(val, val2)
1608
self.assertEqual({}, k._data._cache)
1610
def test_cache_data_read(self):
1611
k = self.create_knit()
1613
def read_one(version):
1614
pos_map = k._get_components_positions([version])
1615
method, index_memo, next = pos_map[version]
1616
lst = list(k._data.read_records_iter([(version, index_memo)]))
1617
self.assertEqual(1, len(lst))
1620
# Now cache and read
1623
val = read_one('text-2')
1624
self.assertEqual(['text-2'], k._data._cache.keys())
1625
self.assertEqual('text-2', val[0])
1626
content, digest = k._data._parse_record('text-2',
1627
k._data._cache['text-2'])
1628
self.assertEqual(content, val[1])
1629
self.assertEqual(digest, val[2])
1632
self.assertEqual({}, k._data._cache)
1634
val2 = read_one('text-2')
1635
self.assertEqual(val, val2)
1636
self.assertEqual({}, k._data._cache)
1638
def test_cache_read(self):
1639
k = self.create_knit()
1642
text = k.get_text('text-1')
1643
self.assertEqual(TEXT_1, text)
1644
self.assertEqual(['text-1'], k._data._cache.keys())
1647
self.assertEqual({}, k._data._cache)
1649
text = k.get_text('text-1')
1650
self.assertEqual(TEXT_1, text)
1651
self.assertEqual({}, k._data._cache)
1654
class TestKnitIndex(KnitTests):
1656
def test_add_versions_dictionary_compresses(self):
1657
"""Adding versions to the index should update the lookup dict"""
1658
knit = self.make_test_knit()
1660
idx.add_version('a-1', ['fulltext'], (None, 0, 0), [])
1661
self.check_file_contents('test.kndx',
1662
'# bzr knit index 8\n'
1664
'a-1 fulltext 0 0 :'
1666
idx.add_versions([('a-2', ['fulltext'], (None, 0, 0), ['a-1']),
1667
('a-3', ['fulltext'], (None, 0, 0), ['a-2']),
1669
self.check_file_contents('test.kndx',
1670
'# bzr knit index 8\n'
1672
'a-1 fulltext 0 0 :\n'
1673
'a-2 fulltext 0 0 0 :\n'
1674
'a-3 fulltext 0 0 1 :'
1676
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1677
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1678
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1679
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1682
def test_add_versions_fails_clean(self):
1683
"""If add_versions fails in the middle, it restores a pristine state.
1685
Any modifications that are made to the index are reset if all versions
1688
# This cheats a little bit by passing in a generator which will
1689
# raise an exception before the processing finishes
1690
# Other possibilities would be to have an version with the wrong number
1691
# of entries, or to make the backing transport unable to write any
1694
knit = self.make_test_knit()
1696
idx.add_version('a-1', ['fulltext'], (None, 0, 0), [])
1698
class StopEarly(Exception):
1701
def generate_failure():
1702
"""Add some entries and then raise an exception"""
1703
yield ('a-2', ['fulltext'], (None, 0, 0), ['a-1'])
1704
yield ('a-3', ['fulltext'], (None, 0, 0), ['a-2'])
1707
# Assert the pre-condition
1708
self.assertEqual(['a-1'], idx._history)
1709
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1711
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1713
# And it shouldn't be modified
1714
self.assertEqual(['a-1'], idx._history)
1715
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1717
def test_knit_index_ignores_empty_files(self):
1718
# There was a race condition in older bzr, where a ^C at the right time
1719
# could leave an empty .kndx file, which bzr would later claim was a
1720
# corrupted file since the header was not present. In reality, the file
1721
# just wasn't created, so it should be ignored.
1722
t = get_transport('.')
1723
t.put_bytes('test.kndx', '')
1725
knit = self.make_test_knit()
1727
def test_knit_index_checks_header(self):
1728
t = get_transport('.')
1729
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1731
self.assertRaises(KnitHeaderError, self.make_test_knit)
1734
class TestGraphIndexKnit(KnitTests):
1735
"""Tests for knits using a GraphIndex rather than a KnitIndex."""
1737
def make_g_index(self, name, ref_lists=0, nodes=[]):
1738
builder = GraphIndexBuilder(ref_lists)
1739
for node, references, value in nodes:
1740
builder.add_node(node, references, value)
1741
stream = builder.finish()
1742
trans = self.get_transport()
1743
trans.put_file(name, stream)
1744
return GraphIndex(trans, name)
1746
def two_graph_index(self, deltas=False, catch_adds=False):
1747
"""Build a two-graph index.
1749
:param deltas: If true, use underlying indices with two node-ref
1750
lists and 'parent' set to a delta-compressed against tail.
1752
# build a complex graph across several indices.
1754
# delta compression inn the index
1755
index1 = self.make_g_index('1', 2, [
1756
(('tip', ), 'N0 100', ([('parent', )], [], )),
1757
(('tail', ), '', ([], []))])
1758
index2 = self.make_g_index('2', 2, [
1759
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], [('tail', )])),
1760
(('separate', ), '', ([], []))])
1762
# just blob location and graph in the index.
1763
index1 = self.make_g_index('1', 1, [
1764
(('tip', ), 'N0 100', ([('parent', )], )),
1765
(('tail', ), '', ([], ))])
1766
index2 = self.make_g_index('2', 1, [
1767
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], )),
1768
(('separate', ), '', ([], ))])
1769
combined_index = CombinedGraphIndex([index1, index2])
1771
self.combined_index = combined_index
1772
self.caught_entries = []
1773
add_callback = self.catch_add
1776
return KnitGraphIndex(combined_index, deltas=deltas,
1777
add_callback=add_callback)
1779
def test_get_graph(self):
1780
index = self.two_graph_index()
1781
self.assertEqual(set([
1782
('tip', ('parent', )),
1784
('parent', ('tail', 'ghost')),
1786
]), set(index.get_graph()))
1788
def test_get_ancestry(self):
1789
# get_ancestry is defined as eliding ghosts, not erroring.
1790
index = self.two_graph_index()
1791
self.assertEqual([], index.get_ancestry([]))
1792
self.assertEqual(['separate'], index.get_ancestry(['separate']))
1793
self.assertEqual(['tail'], index.get_ancestry(['tail']))
1794
self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
1795
self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
1796
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
1797
(['tail', 'parent', 'tip', 'separate'],
1798
['separate', 'tail', 'parent', 'tip'],
1800
# and without topo_sort
1801
self.assertEqual(set(['separate']),
1802
set(index.get_ancestry(['separate'], topo_sorted=False)))
1803
self.assertEqual(set(['tail']),
1804
set(index.get_ancestry(['tail'], topo_sorted=False)))
1805
self.assertEqual(set(['tail', 'parent']),
1806
set(index.get_ancestry(['parent'], topo_sorted=False)))
1807
self.assertEqual(set(['tail', 'parent', 'tip']),
1808
set(index.get_ancestry(['tip'], topo_sorted=False)))
1809
self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
1810
set(index.get_ancestry(['tip', 'separate'])))
1811
# asking for a ghost makes it go boom.
1812
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
1814
def test_get_ancestry_with_ghosts(self):
1815
index = self.two_graph_index()
1816
self.assertEqual([], index.get_ancestry_with_ghosts([]))
1817
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
1818
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
1819
self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
1820
(['tail', 'ghost', 'parent'],
1821
['ghost', 'tail', 'parent'],
1823
self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
1824
(['tail', 'ghost', 'parent', 'tip'],
1825
['ghost', 'tail', 'parent', 'tip'],
1827
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
1828
(['tail', 'ghost', 'parent', 'tip', 'separate'],
1829
['ghost', 'tail', 'parent', 'tip', 'separate'],
1830
['separate', 'tail', 'ghost', 'parent', 'tip'],
1831
['separate', 'ghost', 'tail', 'parent', 'tip'],
1833
# asking for a ghost makes it go boom.
1834
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
1836
def test_num_versions(self):
1837
index = self.two_graph_index()
1838
self.assertEqual(4, index.num_versions())
1840
def test_get_versions(self):
1841
index = self.two_graph_index()
1842
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
1843
set(index.get_versions()))
1845
def test_has_version(self):
1846
index = self.two_graph_index()
1847
self.assertTrue(index.has_version('tail'))
1848
self.assertFalse(index.has_version('ghost'))
1850
def test_get_position(self):
1851
index = self.two_graph_index()
1852
self.assertEqual((index._graph_index._indices[0], 0, 100), index.get_position('tip'))
1853
self.assertEqual((index._graph_index._indices[1], 100, 78), index.get_position('parent'))
1855
def test_get_method_deltas(self):
1856
index = self.two_graph_index(deltas=True)
1857
self.assertEqual('fulltext', index.get_method('tip'))
1858
self.assertEqual('line-delta', index.get_method('parent'))
1860
def test_get_method_no_deltas(self):
1861
# check that the parent-history lookup is ignored with deltas=False.
1862
index = self.two_graph_index(deltas=False)
1863
self.assertEqual('fulltext', index.get_method('tip'))
1864
self.assertEqual('fulltext', index.get_method('parent'))
1866
def test_get_options_deltas(self):
1867
index = self.two_graph_index(deltas=True)
1868
self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
1869
self.assertEqual(['line-delta'], index.get_options('parent'))
1871
def test_get_options_no_deltas(self):
1872
# check that the parent-history lookup is ignored with deltas=False.
1873
index = self.two_graph_index(deltas=False)
1874
self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
1875
self.assertEqual(['fulltext'], index.get_options('parent'))
1877
def test_get_parents(self):
1878
# get_parents ignores ghosts
1879
index = self.two_graph_index()
1880
self.assertEqual(('tail', ), index.get_parents('parent'))
1881
# and errors on ghosts.
1882
self.assertRaises(errors.RevisionNotPresent,
1883
index.get_parents, 'ghost')
1885
def test_get_parents_with_ghosts(self):
1886
index = self.two_graph_index()
1887
self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
1888
# and errors on ghosts.
1889
self.assertRaises(errors.RevisionNotPresent,
1890
index.get_parents_with_ghosts, 'ghost')
1892
def test_check_versions_present(self):
1893
# ghosts should not be considered present
1894
index = self.two_graph_index()
1895
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1897
self.assertRaises(RevisionNotPresent, index.check_versions_present,
1899
index.check_versions_present(['tail', 'separate'])
1901
def catch_add(self, entries):
1902
self.caught_entries.append(entries)
1904
def test_add_no_callback_errors(self):
1905
index = self.two_graph_index()
1906
self.assertRaises(errors.ReadOnlyError, index.add_version,
1907
'new', 'fulltext,no-eol', (None, 50, 60), ['separate'])
1909
def test_add_version_smoke(self):
1910
index = self.two_graph_index(catch_adds=True)
1911
index.add_version('new', 'fulltext,no-eol', (None, 50, 60), ['separate'])
1912
self.assertEqual([[(('new', ), 'N50 60', ((('separate',),),))]],
1913
self.caught_entries)
1915
def test_add_version_delta_not_delta_index(self):
1916
index = self.two_graph_index(catch_adds=True)
1917
self.assertRaises(errors.KnitCorrupt, index.add_version,
1918
'new', 'no-eol,line-delta', (None, 0, 100), ['parent'])
1919
self.assertEqual([], self.caught_entries)
1921
def test_add_version_same_dup(self):
1922
index = self.two_graph_index(catch_adds=True)
1923
# options can be spelt two different ways
1924
index.add_version('tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])
1925
index.add_version('tip', 'no-eol,fulltext', (None, 0, 100), ['parent'])
1926
# but neither should have added data.
1927
self.assertEqual([[], []], self.caught_entries)
1929
def test_add_version_different_dup(self):
1930
index = self.two_graph_index(deltas=True, catch_adds=True)
1932
self.assertRaises(errors.KnitCorrupt, index.add_version,
1933
'tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])
1934
self.assertRaises(errors.KnitCorrupt, index.add_version,
1935
'tip', 'line-delta,no-eol', (None, 0, 100), ['parent'])
1936
self.assertRaises(errors.KnitCorrupt, index.add_version,
1937
'tip', 'fulltext', (None, 0, 100), ['parent'])
1939
self.assertRaises(errors.KnitCorrupt, index.add_version,
1940
'tip', 'fulltext,no-eol', (None, 50, 100), ['parent'])
1941
self.assertRaises(errors.KnitCorrupt, index.add_version,
1942
'tip', 'fulltext,no-eol', (None, 0, 1000), ['parent'])
1944
self.assertRaises(errors.KnitCorrupt, index.add_version,
1945
'tip', 'fulltext,no-eol', (None, 0, 100), [])
1946
self.assertEqual([], self.caught_entries)
1948
def test_add_versions_nodeltas(self):
1949
index = self.two_graph_index(catch_adds=True)
1950
index.add_versions([
1951
('new', 'fulltext,no-eol', (None, 50, 60), ['separate']),
1952
('new2', 'fulltext', (None, 0, 6), ['new']),
1954
self.assertEqual([(('new', ), 'N50 60', ((('separate',),),)),
1955
(('new2', ), ' 0 6', ((('new',),),))],
1956
sorted(self.caught_entries[0]))
1957
self.assertEqual(1, len(self.caught_entries))
1959
def test_add_versions_deltas(self):
1960
index = self.two_graph_index(deltas=True, catch_adds=True)
1961
index.add_versions([
1962
('new', 'fulltext,no-eol', (None, 50, 60), ['separate']),
1963
('new2', 'line-delta', (None, 0, 6), ['new']),
1965
self.assertEqual([(('new', ), 'N50 60', ((('separate',),), ())),
1966
(('new2', ), ' 0 6', ((('new',),), (('new',),), ))],
1967
sorted(self.caught_entries[0]))
1968
self.assertEqual(1, len(self.caught_entries))
1970
def test_add_versions_delta_not_delta_index(self):
1971
index = self.two_graph_index(catch_adds=True)
1972
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1973
[('new', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
1974
self.assertEqual([], self.caught_entries)
1976
def test_add_versions_same_dup(self):
1977
index = self.two_graph_index(catch_adds=True)
1978
# options can be spelt two different ways
1979
index.add_versions([('tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])])
1980
index.add_versions([('tip', 'no-eol,fulltext', (None, 0, 100), ['parent'])])
1981
# but neither should have added data.
1982
self.assertEqual([[], []], self.caught_entries)
1984
def test_add_versions_different_dup(self):
1985
index = self.two_graph_index(deltas=True, catch_adds=True)
1987
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1988
[('tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
1989
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1990
[('tip', 'line-delta,no-eol', (None, 0, 100), ['parent'])])
1991
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1992
[('tip', 'fulltext', (None, 0, 100), ['parent'])])
1994
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1995
[('tip', 'fulltext,no-eol', (None, 50, 100), ['parent'])])
1996
self.assertRaises(errors.KnitCorrupt, index.add_versions,
1997
[('tip', 'fulltext,no-eol', (None, 0, 1000), ['parent'])])
1999
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2000
[('tip', 'fulltext,no-eol', (None, 0, 100), [])])
2001
# change options in the second record
2002
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2003
[('tip', 'fulltext,no-eol', (None, 0, 100), ['parent']),
2004
('tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
2005
self.assertEqual([], self.caught_entries)
2007
def test_iter_parents(self):
2008
index1 = self.make_g_index('1', 1, [
2010
(('r0', ), 'N0 100', ([], )),
2012
(('r1', ), '', ([('r0', )], ))])
2013
index2 = self.make_g_index('2', 1, [
2015
(('r2', ), 'N0 100', ([('r1', ), ('r0', )], )),
2017
combined_index = CombinedGraphIndex([index1, index2])
2018
index = KnitGraphIndex(combined_index)
2020
# cases: each sample data individually:
2021
self.assertEqual(set([('r0', ())]),
2022
set(index.iter_parents(['r0'])))
2023
self.assertEqual(set([('r1', ('r0', ))]),
2024
set(index.iter_parents(['r1'])))
2025
self.assertEqual(set([('r2', ('r1', 'r0'))]),
2026
set(index.iter_parents(['r2'])))
2027
# no nodes returned for a missing node
2028
self.assertEqual(set(),
2029
set(index.iter_parents(['missing'])))
2030
# 1 node returned with missing nodes skipped
2031
self.assertEqual(set([('r1', ('r0', ))]),
2032
set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
2034
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
2035
set(index.iter_parents(['r0', 'r1'])))
2036
# 2 nodes returned, missing skipped
2037
self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
2038
set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
2041
class TestNoParentsGraphIndexKnit(KnitTests):
2042
"""Tests for knits using KnitGraphIndex with no parents."""
2044
def make_g_index(self, name, ref_lists=0, nodes=[]):
2045
builder = GraphIndexBuilder(ref_lists)
2046
for node, references in nodes:
2047
builder.add_node(node, references)
2048
stream = builder.finish()
2049
trans = self.get_transport()
2050
trans.put_file(name, stream)
2051
return GraphIndex(trans, name)
2053
def test_parents_deltas_incompatible(self):
2054
index = CombinedGraphIndex([])
2055
self.assertRaises(errors.KnitError, KnitGraphIndex, index,
2056
deltas=True, parents=False)
2058
def two_graph_index(self, catch_adds=False):
2059
"""Build a two-graph index.
2061
:param deltas: If true, use underlying indices with two node-ref
2062
lists and 'parent' set to a delta-compressed against tail.
2064
# put several versions in the index.
2065
index1 = self.make_g_index('1', 0, [
2066
(('tip', ), 'N0 100'),
2068
index2 = self.make_g_index('2', 0, [
2069
(('parent', ), ' 100 78'),
2070
(('separate', ), '')])
2071
combined_index = CombinedGraphIndex([index1, index2])
2073
self.combined_index = combined_index
2074
self.caught_entries = []
2075
add_callback = self.catch_add
2078
return KnitGraphIndex(combined_index, parents=False,
2079
add_callback=add_callback)
2081
def test_get_graph(self):
2082
index = self.two_graph_index()
2083
self.assertEqual(set([
2088
]), set(index.get_graph()))
2090
def test_get_ancestry(self):
2091
# with no parents, ancestry is always just the key.
2092
index = self.two_graph_index()
2093
self.assertEqual([], index.get_ancestry([]))
2094
self.assertEqual(['separate'], index.get_ancestry(['separate']))
2095
self.assertEqual(['tail'], index.get_ancestry(['tail']))
2096
self.assertEqual(['parent'], index.get_ancestry(['parent']))
2097
self.assertEqual(['tip'], index.get_ancestry(['tip']))
2098
self.assertTrue(index.get_ancestry(['tip', 'separate']) in
2099
(['tip', 'separate'],
2100
['separate', 'tip'],
2102
# asking for a ghost makes it go boom.
2103
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
2105
def test_get_ancestry_with_ghosts(self):
2106
index = self.two_graph_index()
2107
self.assertEqual([], index.get_ancestry_with_ghosts([]))
2108
self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
2109
self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
2110
self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
2111
self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
2112
self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
2113
(['tip', 'separate'],
2114
['separate', 'tip'],
2116
# asking for a ghost makes it go boom.
2117
self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
2119
def test_num_versions(self):
2120
index = self.two_graph_index()
2121
self.assertEqual(4, index.num_versions())
2123
def test_get_versions(self):
2124
index = self.two_graph_index()
2125
self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
2126
set(index.get_versions()))
2128
def test_has_version(self):
2129
index = self.two_graph_index()
2130
self.assertTrue(index.has_version('tail'))
2131
self.assertFalse(index.has_version('ghost'))
2133
def test_get_position(self):
2134
index = self.two_graph_index()
2135
self.assertEqual((index._graph_index._indices[0], 0, 100), index.get_position('tip'))
2136
self.assertEqual((index._graph_index._indices[1], 100, 78), index.get_position('parent'))
2138
def test_get_method(self):
2139
index = self.two_graph_index()
2140
self.assertEqual('fulltext', index.get_method('tip'))
2141
self.assertEqual(['fulltext'], index.get_options('parent'))
2143
def test_get_options(self):
2144
index = self.two_graph_index()
2145
self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
2146
self.assertEqual(['fulltext'], index.get_options('parent'))
2148
def test_get_parents(self):
2149
index = self.two_graph_index()
2150
self.assertEqual((), index.get_parents('parent'))
2151
# and errors on ghosts.
2152
self.assertRaises(errors.RevisionNotPresent,
2153
index.get_parents, 'ghost')
2155
def test_get_parents_with_ghosts(self):
2156
index = self.two_graph_index()
2157
self.assertEqual((), index.get_parents_with_ghosts('parent'))
2158
# and errors on ghosts.
2159
self.assertRaises(errors.RevisionNotPresent,
2160
index.get_parents_with_ghosts, 'ghost')
2162
def test_check_versions_present(self):
2163
index = self.two_graph_index()
2164
self.assertRaises(RevisionNotPresent, index.check_versions_present,
2166
self.assertRaises(RevisionNotPresent, index.check_versions_present,
2167
['tail', 'missing'])
2168
index.check_versions_present(['tail', 'separate'])
2170
def catch_add(self, entries):
2171
self.caught_entries.append(entries)
2173
def test_add_no_callback_errors(self):
2174
index = self.two_graph_index()
2175
self.assertRaises(errors.ReadOnlyError, index.add_version,
2176
'new', 'fulltext,no-eol', (None, 50, 60), ['separate'])
2178
def test_add_version_smoke(self):
2179
index = self.two_graph_index(catch_adds=True)
2180
index.add_version('new', 'fulltext,no-eol', (None, 50, 60), [])
2181
self.assertEqual([[(('new', ), 'N50 60')]],
2182
self.caught_entries)
2184
def test_add_version_delta_not_delta_index(self):
2185
index = self.two_graph_index(catch_adds=True)
2186
self.assertRaises(errors.KnitCorrupt, index.add_version,
2187
'new', 'no-eol,line-delta', (None, 0, 100), [])
2188
self.assertEqual([], self.caught_entries)
2190
def test_add_version_same_dup(self):
2191
index = self.two_graph_index(catch_adds=True)
2192
# options can be spelt two different ways
2193
index.add_version('tip', 'fulltext,no-eol', (None, 0, 100), [])
2194
index.add_version('tip', 'no-eol,fulltext', (None, 0, 100), [])
2195
# but neither should have added data.
2196
self.assertEqual([[], []], self.caught_entries)
2198
def test_add_version_different_dup(self):
2199
index = self.two_graph_index(catch_adds=True)
2201
self.assertRaises(errors.KnitCorrupt, index.add_version,
2202
'tip', 'no-eol,line-delta', (None, 0, 100), [])
2203
self.assertRaises(errors.KnitCorrupt, index.add_version,
2204
'tip', 'line-delta,no-eol', (None, 0, 100), [])
2205
self.assertRaises(errors.KnitCorrupt, index.add_version,
2206
'tip', 'fulltext', (None, 0, 100), [])
2208
self.assertRaises(errors.KnitCorrupt, index.add_version,
2209
'tip', 'fulltext,no-eol', (None, 50, 100), [])
2210
self.assertRaises(errors.KnitCorrupt, index.add_version,
2211
'tip', 'fulltext,no-eol', (None, 0, 1000), [])
2213
self.assertRaises(errors.KnitCorrupt, index.add_version,
2214
'tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])
2215
self.assertEqual([], self.caught_entries)
2217
def test_add_versions(self):
2218
index = self.two_graph_index(catch_adds=True)
2219
index.add_versions([
2220
('new', 'fulltext,no-eol', (None, 50, 60), []),
2221
('new2', 'fulltext', (None, 0, 6), []),
2223
self.assertEqual([(('new', ), 'N50 60'), (('new2', ), ' 0 6')],
2224
sorted(self.caught_entries[0]))
2225
self.assertEqual(1, len(self.caught_entries))
2227
def test_add_versions_delta_not_delta_index(self):
2228
index = self.two_graph_index(catch_adds=True)
2229
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2230
[('new', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
2231
self.assertEqual([], self.caught_entries)
2233
def test_add_versions_parents_not_parents_index(self):
2234
index = self.two_graph_index(catch_adds=True)
2235
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2236
[('new', 'no-eol,fulltext', (None, 0, 100), ['parent'])])
2237
self.assertEqual([], self.caught_entries)
2239
def test_add_versions_same_dup(self):
2240
index = self.two_graph_index(catch_adds=True)
2241
# options can be spelt two different ways
2242
index.add_versions([('tip', 'fulltext,no-eol', (None, 0, 100), [])])
2243
index.add_versions([('tip', 'no-eol,fulltext', (None, 0, 100), [])])
2244
# but neither should have added data.
2245
self.assertEqual([[], []], self.caught_entries)
2247
def test_add_versions_different_dup(self):
2248
index = self.two_graph_index(catch_adds=True)
2250
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2251
[('tip', 'no-eol,line-delta', (None, 0, 100), [])])
2252
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2253
[('tip', 'line-delta,no-eol', (None, 0, 100), [])])
2254
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2255
[('tip', 'fulltext', (None, 0, 100), [])])
2257
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2258
[('tip', 'fulltext,no-eol', (None, 50, 100), [])])
2259
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2260
[('tip', 'fulltext,no-eol', (None, 0, 1000), [])])
2262
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2263
[('tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])])
2264
# change options in the second record
2265
self.assertRaises(errors.KnitCorrupt, index.add_versions,
2266
[('tip', 'fulltext,no-eol', (None, 0, 100), []),
2267
('tip', 'no-eol,line-delta', (None, 0, 100), [])])
2268
self.assertEqual([], self.caught_entries)
2270
def test_iter_parents(self):
2271
index = self.two_graph_index()
2272
self.assertEqual(set([
2273
('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
2275
set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
2276
self.assertEqual(set([('tip', ())]),
2277
set(index.iter_parents(['tip'])))
2278
self.assertEqual(set(),
2279
set(index.iter_parents([])))