~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

MergeĀ knitsĀ branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
from cStringIO import StringIO
 
20
import difflib
 
21
import gzip
 
22
import sha
 
23
import sys
 
24
 
 
25
from bzrlib import (
 
26
    errors,
 
27
    generate_ids,
 
28
    knit,
 
29
    )
 
30
from bzrlib.errors import (
 
31
    RevisionAlreadyPresent,
 
32
    KnitHeaderError,
 
33
    RevisionNotPresent,
 
34
    NoSuchFile,
 
35
    )
 
36
from bzrlib.index import *
 
37
from bzrlib.knit import (
 
38
    KnitContent,
 
39
    KnitGraphIndex,
 
40
    KnitVersionedFile,
 
41
    KnitPlainFactory,
 
42
    KnitAnnotateFactory,
 
43
    _KnitData,
 
44
    _KnitIndex,
 
45
    WeaveToKnit,
 
46
    )
 
47
from bzrlib.osutils import split_lines
 
48
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
 
49
from bzrlib.transport import TransportLogger, get_transport
 
50
from bzrlib.transport.memory import MemoryTransport
 
51
from bzrlib.weave import Weave
 
52
 
 
53
 
 
54
class _CompiledKnitFeature(Feature):
 
55
 
 
56
    def _probe(self):
 
57
        try:
 
58
            import bzrlib._knit_load_data_c
 
59
        except ImportError:
 
60
            return False
 
61
        return True
 
62
 
 
63
    def feature_name(self):
 
64
        return 'bzrlib._knit_load_data_c'
 
65
 
 
66
CompiledKnitFeature = _CompiledKnitFeature()
 
67
 
 
68
 
 
69
class KnitContentTests(TestCase):
 
70
 
 
71
    def test_constructor(self):
 
72
        content = KnitContent([])
 
73
 
 
74
    def test_text(self):
 
75
        content = KnitContent([])
 
76
        self.assertEqual(content.text(), [])
 
77
 
 
78
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
79
        self.assertEqual(content.text(), ["text1", "text2"])
 
80
 
 
81
    def test_annotate(self):
 
82
        content = KnitContent([])
 
83
        self.assertEqual(content.annotate(), [])
 
84
 
 
85
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
86
        self.assertEqual(content.annotate(),
 
87
            [("origin1", "text1"), ("origin2", "text2")])
 
88
 
 
89
    def test_annotate_iter(self):
 
90
        content = KnitContent([])
 
91
        it = content.annotate_iter()
 
92
        self.assertRaises(StopIteration, it.next)
 
93
 
 
94
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
95
        it = content.annotate_iter()
 
96
        self.assertEqual(it.next(), ("origin1", "text1"))
 
97
        self.assertEqual(it.next(), ("origin2", "text2"))
 
98
        self.assertRaises(StopIteration, it.next)
 
99
 
 
100
    def test_copy(self):
 
101
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
102
        copy = content.copy()
 
103
        self.assertIsInstance(copy, KnitContent)
 
104
        self.assertEqual(copy.annotate(),
 
105
            [("origin1", "text1"), ("origin2", "text2")])
 
106
 
 
107
    def test_line_delta(self):
 
108
        content1 = KnitContent([("", "a"), ("", "b")])
 
109
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
110
        self.assertEqual(content1.line_delta(content2),
 
111
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
112
 
 
113
    def test_line_delta_iter(self):
 
114
        content1 = KnitContent([("", "a"), ("", "b")])
 
115
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
116
        it = content1.line_delta_iter(content2)
 
117
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
118
        self.assertRaises(StopIteration, it.next)
 
119
 
 
120
 
 
121
class MockTransport(object):
 
122
 
 
123
    def __init__(self, file_lines=None):
 
124
        self.file_lines = file_lines
 
125
        self.calls = []
 
126
        # We have no base directory for the MockTransport
 
127
        self.base = ''
 
128
 
 
129
    def get(self, filename):
 
130
        if self.file_lines is None:
 
131
            raise NoSuchFile(filename)
 
132
        else:
 
133
            return StringIO("\n".join(self.file_lines))
 
134
 
 
135
    def readv(self, relpath, offsets):
 
136
        fp = self.get(relpath)
 
137
        for offset, size in offsets:
 
138
            fp.seek(offset)
 
139
            yield offset, fp.read(size)
 
140
 
 
141
    def __getattr__(self, name):
 
142
        def queue_call(*args, **kwargs):
 
143
            self.calls.append((name, args, kwargs))
 
144
        return queue_call
 
145
 
 
146
 
 
147
class LowLevelKnitDataTests(TestCase):
 
148
 
 
149
    def create_gz_content(self, text):
 
150
        sio = StringIO()
 
151
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
152
        gz_file.write(text)
 
153
        gz_file.close()
 
154
        return sio.getvalue()
 
155
 
 
156
    def test_valid_knit_data(self):
 
157
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
158
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
159
                                        'foo\n'
 
160
                                        'bar\n'
 
161
                                        'end rev-id-1\n'
 
162
                                        % (sha1sum,))
 
163
        transport = MockTransport([gz_txt])
 
164
        data = _KnitData(transport, 'filename', mode='r')
 
165
        records = [('rev-id-1', 0, len(gz_txt))]
 
166
 
 
167
        contents = data.read_records(records)
 
168
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
169
 
 
170
        raw_contents = list(data.read_records_iter_raw(records))
 
171
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
172
 
 
173
    def test_not_enough_lines(self):
 
174
        sha1sum = sha.new('foo\n').hexdigest()
 
175
        # record says 2 lines data says 1
 
176
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
177
                                        'foo\n'
 
178
                                        'end rev-id-1\n'
 
179
                                        % (sha1sum,))
 
180
        transport = MockTransport([gz_txt])
 
181
        data = _KnitData(transport, 'filename', mode='r')
 
182
        records = [('rev-id-1', 0, len(gz_txt))]
 
183
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
184
 
 
185
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
186
        raw_contents = list(data.read_records_iter_raw(records))
 
187
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
188
 
 
189
    def test_too_many_lines(self):
 
190
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
191
        # record says 1 lines data says 2
 
192
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
193
                                        'foo\n'
 
194
                                        'bar\n'
 
195
                                        'end rev-id-1\n'
 
196
                                        % (sha1sum,))
 
197
        transport = MockTransport([gz_txt])
 
198
        data = _KnitData(transport, 'filename', mode='r')
 
199
        records = [('rev-id-1', 0, len(gz_txt))]
 
200
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
201
 
 
202
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
203
        raw_contents = list(data.read_records_iter_raw(records))
 
204
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
205
 
 
206
    def test_mismatched_version_id(self):
 
207
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
208
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
209
                                        'foo\n'
 
210
                                        'bar\n'
 
211
                                        'end rev-id-1\n'
 
212
                                        % (sha1sum,))
 
213
        transport = MockTransport([gz_txt])
 
214
        data = _KnitData(transport, 'filename', mode='r')
 
215
        # We are asking for rev-id-2, but the data is rev-id-1
 
216
        records = [('rev-id-2', 0, len(gz_txt))]
 
217
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
218
 
 
219
        # read_records_iter_raw will notice if we request the wrong version.
 
220
        self.assertRaises(errors.KnitCorrupt, list,
 
221
                          data.read_records_iter_raw(records))
 
222
 
 
223
    def test_uncompressed_data(self):
 
224
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
225
        txt = ('version rev-id-1 2 %s\n'
 
226
               'foo\n'
 
227
               'bar\n'
 
228
               'end rev-id-1\n'
 
229
               % (sha1sum,))
 
230
        transport = MockTransport([txt])
 
231
        data = _KnitData(transport, 'filename', mode='r')
 
232
        records = [('rev-id-1', 0, len(txt))]
 
233
 
 
234
        # We don't have valid gzip data ==> corrupt
 
235
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
236
 
 
237
        # read_records_iter_raw will notice the bad data
 
238
        self.assertRaises(errors.KnitCorrupt, list,
 
239
                          data.read_records_iter_raw(records))
 
240
 
 
241
    def test_corrupted_data(self):
 
242
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
243
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
244
                                        'foo\n'
 
245
                                        'bar\n'
 
246
                                        'end rev-id-1\n'
 
247
                                        % (sha1sum,))
 
248
        # Change 2 bytes in the middle to \xff
 
249
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
250
        transport = MockTransport([gz_txt])
 
251
        data = _KnitData(transport, 'filename', mode='r')
 
252
        records = [('rev-id-1', 0, len(gz_txt))]
 
253
 
 
254
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
255
 
 
256
        # read_records_iter_raw will notice if we request the wrong version.
 
257
        self.assertRaises(errors.KnitCorrupt, list,
 
258
                          data.read_records_iter_raw(records))
 
259
 
 
260
 
 
261
class LowLevelKnitIndexTests(TestCase):
 
262
 
 
263
    def get_knit_index(self, *args, **kwargs):
 
264
        orig = knit._load_data
 
265
        def reset():
 
266
            knit._load_data = orig
 
267
        self.addCleanup(reset)
 
268
        from bzrlib._knit_load_data_py import _load_data_py
 
269
        knit._load_data = _load_data_py
 
270
        return _KnitIndex(*args, **kwargs)
 
271
 
 
272
    def test_no_such_file(self):
 
273
        transport = MockTransport()
 
274
 
 
275
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
276
                          transport, "filename", "r")
 
277
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
278
                          transport, "filename", "w", create=False)
 
279
 
 
280
    def test_create_file(self):
 
281
        transport = MockTransport()
 
282
 
 
283
        index = self.get_knit_index(transport, "filename", "w",
 
284
            file_mode="wb", create=True)
 
285
        self.assertEqual(
 
286
                ("put_bytes_non_atomic",
 
287
                    ("filename", index.HEADER), {"mode": "wb"}),
 
288
                transport.calls.pop(0))
 
289
 
 
290
    def test_delay_create_file(self):
 
291
        transport = MockTransport()
 
292
 
 
293
        index = self.get_knit_index(transport, "filename", "w",
 
294
            create=True, file_mode="wb", create_parent_dir=True,
 
295
            delay_create=True, dir_mode=0777)
 
296
        self.assertEqual([], transport.calls)
 
297
 
 
298
        index.add_versions([])
 
299
        name, (filename, f), kwargs = transport.calls.pop(0)
 
300
        self.assertEqual("put_file_non_atomic", name)
 
301
        self.assertEqual(
 
302
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
303
            kwargs)
 
304
        self.assertEqual("filename", filename)
 
305
        self.assertEqual(index.HEADER, f.read())
 
306
 
 
307
        index.add_versions([])
 
308
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
309
            transport.calls.pop(0))
 
310
 
 
311
    def test_read_utf8_version_id(self):
 
312
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
313
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
314
        transport = MockTransport([
 
315
            _KnitIndex.HEADER,
 
316
            '%s option 0 1 :' % (utf8_revision_id,)
 
317
            ])
 
318
        index = self.get_knit_index(transport, "filename", "r")
 
319
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
320
        # Unicode revision_ids.
 
321
        self.assertTrue(index.has_version(utf8_revision_id))
 
322
        self.assertFalse(index.has_version(unicode_revision_id))
 
323
 
 
324
    def test_read_utf8_parents(self):
 
325
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
326
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
327
        transport = MockTransport([
 
328
            _KnitIndex.HEADER,
 
329
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
330
            ])
 
331
        index = self.get_knit_index(transport, "filename", "r")
 
332
        self.assertEqual([utf8_revision_id],
 
333
            index.get_parents_with_ghosts("version"))
 
334
 
 
335
    def test_read_ignore_corrupted_lines(self):
 
336
        transport = MockTransport([
 
337
            _KnitIndex.HEADER,
 
338
            "corrupted",
 
339
            "corrupted options 0 1 .b .c ",
 
340
            "version options 0 1 :"
 
341
            ])
 
342
        index = self.get_knit_index(transport, "filename", "r")
 
343
        self.assertEqual(1, index.num_versions())
 
344
        self.assertTrue(index.has_version("version"))
 
345
 
 
346
    def test_read_corrupted_header(self):
 
347
        transport = MockTransport(['not a bzr knit index header\n'])
 
348
        self.assertRaises(KnitHeaderError,
 
349
            self.get_knit_index, transport, "filename", "r")
 
350
 
 
351
    def test_read_duplicate_entries(self):
 
352
        transport = MockTransport([
 
353
            _KnitIndex.HEADER,
 
354
            "parent options 0 1 :",
 
355
            "version options1 0 1 0 :",
 
356
            "version options2 1 2 .other :",
 
357
            "version options3 3 4 0 .other :"
 
358
            ])
 
359
        index = self.get_knit_index(transport, "filename", "r")
 
360
        self.assertEqual(2, index.num_versions())
 
361
        # check that the index used is the first one written. (Specific
 
362
        # to KnitIndex style indices.
 
363
        self.assertEqual("1", index._version_list_to_index(["version"]))
 
364
        self.assertEqual((3, 4), index.get_position("version"))
 
365
        self.assertEqual(["options3"], index.get_options("version"))
 
366
        self.assertEqual(["parent", "other"],
 
367
            index.get_parents_with_ghosts("version"))
 
368
 
 
369
    def test_read_compressed_parents(self):
 
370
        transport = MockTransport([
 
371
            _KnitIndex.HEADER,
 
372
            "a option 0 1 :",
 
373
            "b option 0 1 0 :",
 
374
            "c option 0 1 1 0 :",
 
375
            ])
 
376
        index = self.get_knit_index(transport, "filename", "r")
 
377
        self.assertEqual(["a"], index.get_parents("b"))
 
378
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
379
 
 
380
    def test_write_utf8_version_id(self):
 
381
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
382
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
383
        transport = MockTransport([
 
384
            _KnitIndex.HEADER
 
385
            ])
 
386
        index = self.get_knit_index(transport, "filename", "r")
 
387
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
 
388
        self.assertEqual(("append_bytes", ("filename",
 
389
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
390
            {}),
 
391
            transport.calls.pop(0))
 
392
 
 
393
    def test_write_utf8_parents(self):
 
394
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
395
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
396
        transport = MockTransport([
 
397
            _KnitIndex.HEADER
 
398
            ])
 
399
        index = self.get_knit_index(transport, "filename", "r")
 
400
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
 
401
        self.assertEqual(("append_bytes", ("filename",
 
402
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
403
            {}),
 
404
            transport.calls.pop(0))
 
405
 
 
406
    def test_get_graph(self):
 
407
        transport = MockTransport()
 
408
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
409
        self.assertEqual([], index.get_graph())
 
410
 
 
411
        index.add_version("a", ["option"], 0, 1, ["b"])
 
412
        self.assertEqual([("a", ["b"])], index.get_graph())
 
413
 
 
414
        index.add_version("c", ["option"], 0, 1, ["d"])
 
415
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
416
            sorted(index.get_graph()))
 
417
 
 
418
    def test_get_ancestry(self):
 
419
        transport = MockTransport([
 
420
            _KnitIndex.HEADER,
 
421
            "a option 0 1 :",
 
422
            "b option 0 1 0 .e :",
 
423
            "c option 0 1 1 0 :",
 
424
            "d option 0 1 2 .f :"
 
425
            ])
 
426
        index = self.get_knit_index(transport, "filename", "r")
 
427
 
 
428
        self.assertEqual([], index.get_ancestry([]))
 
429
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
430
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
431
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
432
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
433
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
434
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
435
 
 
436
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
437
 
 
438
    def test_get_ancestry_with_ghosts(self):
 
439
        transport = MockTransport([
 
440
            _KnitIndex.HEADER,
 
441
            "a option 0 1 :",
 
442
            "b option 0 1 0 .e :",
 
443
            "c option 0 1 0 .f .g :",
 
444
            "d option 0 1 2 .h .j .k :"
 
445
            ])
 
446
        index = self.get_knit_index(transport, "filename", "r")
 
447
 
 
448
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
449
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
450
        self.assertEqual(["a", "e", "b"],
 
451
            index.get_ancestry_with_ghosts(["b"]))
 
452
        self.assertEqual(["a", "g", "f", "c"],
 
453
            index.get_ancestry_with_ghosts(["c"]))
 
454
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
455
            index.get_ancestry_with_ghosts(["d"]))
 
456
        self.assertEqual(["a", "e", "b"],
 
457
            index.get_ancestry_with_ghosts(["a", "b"]))
 
458
        self.assertEqual(["a", "g", "f", "c"],
 
459
            index.get_ancestry_with_ghosts(["a", "c"]))
 
460
        self.assertEqual(
 
461
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
462
            index.get_ancestry_with_ghosts(["b", "d"]))
 
463
 
 
464
        self.assertRaises(RevisionNotPresent,
 
465
            index.get_ancestry_with_ghosts, ["e"])
 
466
 
 
467
    def test_iter_parents(self):
 
468
        transport = MockTransport()
 
469
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
470
        # no parents
 
471
        index.add_version('r0', ['option'], 0, 1, [])
 
472
        # 1 parent
 
473
        index.add_version('r1', ['option'], 0, 1, ['r0'])
 
474
        # 2 parents
 
475
        index.add_version('r2', ['option'], 0, 1, ['r1', 'r0'])
 
476
        # XXX TODO a ghost
 
477
        # cases: each sample data individually:
 
478
        self.assertEqual(set([('r0', ())]),
 
479
            set(index.iter_parents(['r0'])))
 
480
        self.assertEqual(set([('r1', ('r0', ))]),
 
481
            set(index.iter_parents(['r1'])))
 
482
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
483
            set(index.iter_parents(['r2'])))
 
484
        # no nodes returned for a missing node
 
485
        self.assertEqual(set(),
 
486
            set(index.iter_parents(['missing'])))
 
487
        # 1 node returned with missing nodes skipped
 
488
        self.assertEqual(set([('r1', ('r0', ))]),
 
489
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
 
490
        # 2 nodes returned
 
491
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
492
            set(index.iter_parents(['r0', 'r1'])))
 
493
        # 2 nodes returned, missing skipped
 
494
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
495
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
496
 
 
497
    def test_num_versions(self):
 
498
        transport = MockTransport([
 
499
            _KnitIndex.HEADER
 
500
            ])
 
501
        index = self.get_knit_index(transport, "filename", "r")
 
502
 
 
503
        self.assertEqual(0, index.num_versions())
 
504
        self.assertEqual(0, len(index))
 
505
 
 
506
        index.add_version("a", ["option"], 0, 1, [])
 
507
        self.assertEqual(1, index.num_versions())
 
508
        self.assertEqual(1, len(index))
 
509
 
 
510
        index.add_version("a", ["option2"], 1, 2, [])
 
511
        self.assertEqual(1, index.num_versions())
 
512
        self.assertEqual(1, len(index))
 
513
 
 
514
        index.add_version("b", ["option"], 0, 1, [])
 
515
        self.assertEqual(2, index.num_versions())
 
516
        self.assertEqual(2, len(index))
 
517
 
 
518
    def test_get_versions(self):
 
519
        transport = MockTransport([
 
520
            _KnitIndex.HEADER
 
521
            ])
 
522
        index = self.get_knit_index(transport, "filename", "r")
 
523
 
 
524
        self.assertEqual([], index.get_versions())
 
525
 
 
526
        index.add_version("a", ["option"], 0, 1, [])
 
527
        self.assertEqual(["a"], index.get_versions())
 
528
 
 
529
        index.add_version("a", ["option"], 0, 1, [])
 
530
        self.assertEqual(["a"], index.get_versions())
 
531
 
 
532
        index.add_version("b", ["option"], 0, 1, [])
 
533
        self.assertEqual(["a", "b"], index.get_versions())
 
534
 
 
535
    def test_add_version(self):
 
536
        transport = MockTransport([
 
537
            _KnitIndex.HEADER
 
538
            ])
 
539
        index = self.get_knit_index(transport, "filename", "r")
 
540
 
 
541
        index.add_version("a", ["option"], 0, 1, ["b"])
 
542
        self.assertEqual(("append_bytes",
 
543
            ("filename", "\na option 0 1 .b :"),
 
544
            {}), transport.calls.pop(0))
 
545
        self.assertTrue(index.has_version("a"))
 
546
        self.assertEqual(1, index.num_versions())
 
547
        self.assertEqual((0, 1), index.get_position("a"))
 
548
        self.assertEqual(["option"], index.get_options("a"))
 
549
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
550
 
 
551
        index.add_version("a", ["opt"], 1, 2, ["c"])
 
552
        self.assertEqual(("append_bytes",
 
553
            ("filename", "\na opt 1 2 .c :"),
 
554
            {}), transport.calls.pop(0))
 
555
        self.assertTrue(index.has_version("a"))
 
556
        self.assertEqual(1, index.num_versions())
 
557
        self.assertEqual((1, 2), index.get_position("a"))
 
558
        self.assertEqual(["opt"], index.get_options("a"))
 
559
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
560
 
 
561
        index.add_version("b", ["option"], 2, 3, ["a"])
 
562
        self.assertEqual(("append_bytes",
 
563
            ("filename", "\nb option 2 3 0 :"),
 
564
            {}), transport.calls.pop(0))
 
565
        self.assertTrue(index.has_version("b"))
 
566
        self.assertEqual(2, index.num_versions())
 
567
        self.assertEqual((2, 3), index.get_position("b"))
 
568
        self.assertEqual(["option"], index.get_options("b"))
 
569
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
570
 
 
571
    def test_add_versions(self):
 
572
        transport = MockTransport([
 
573
            _KnitIndex.HEADER
 
574
            ])
 
575
        index = self.get_knit_index(transport, "filename", "r")
 
576
 
 
577
        index.add_versions([
 
578
            ("a", ["option"], 0, 1, ["b"]),
 
579
            ("a", ["opt"], 1, 2, ["c"]),
 
580
            ("b", ["option"], 2, 3, ["a"])
 
581
            ])
 
582
        self.assertEqual(("append_bytes", ("filename",
 
583
            "\na option 0 1 .b :"
 
584
            "\na opt 1 2 .c :"
 
585
            "\nb option 2 3 0 :"
 
586
            ), {}), transport.calls.pop(0))
 
587
        self.assertTrue(index.has_version("a"))
 
588
        self.assertTrue(index.has_version("b"))
 
589
        self.assertEqual(2, index.num_versions())
 
590
        self.assertEqual((1, 2), index.get_position("a"))
 
591
        self.assertEqual((2, 3), index.get_position("b"))
 
592
        self.assertEqual(["opt"], index.get_options("a"))
 
593
        self.assertEqual(["option"], index.get_options("b"))
 
594
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
595
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
596
 
 
597
    def test_delay_create_and_add_versions(self):
 
598
        transport = MockTransport()
 
599
 
 
600
        index = self.get_knit_index(transport, "filename", "w",
 
601
            create=True, file_mode="wb", create_parent_dir=True,
 
602
            delay_create=True, dir_mode=0777)
 
603
        self.assertEqual([], transport.calls)
 
604
 
 
605
        index.add_versions([
 
606
            ("a", ["option"], 0, 1, ["b"]),
 
607
            ("a", ["opt"], 1, 2, ["c"]),
 
608
            ("b", ["option"], 2, 3, ["a"])
 
609
            ])
 
610
        name, (filename, f), kwargs = transport.calls.pop(0)
 
611
        self.assertEqual("put_file_non_atomic", name)
 
612
        self.assertEqual(
 
613
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
614
            kwargs)
 
615
        self.assertEqual("filename", filename)
 
616
        self.assertEqual(
 
617
            index.HEADER +
 
618
            "\na option 0 1 .b :"
 
619
            "\na opt 1 2 .c :"
 
620
            "\nb option 2 3 0 :",
 
621
            f.read())
 
622
 
 
623
    def test_has_version(self):
 
624
        transport = MockTransport([
 
625
            _KnitIndex.HEADER,
 
626
            "a option 0 1 :"
 
627
            ])
 
628
        index = self.get_knit_index(transport, "filename", "r")
 
629
 
 
630
        self.assertTrue(index.has_version("a"))
 
631
        self.assertFalse(index.has_version("b"))
 
632
 
 
633
    def test_get_position(self):
 
634
        transport = MockTransport([
 
635
            _KnitIndex.HEADER,
 
636
            "a option 0 1 :",
 
637
            "b option 1 2 :"
 
638
            ])
 
639
        index = self.get_knit_index(transport, "filename", "r")
 
640
 
 
641
        self.assertEqual((0, 1), index.get_position("a"))
 
642
        self.assertEqual((1, 2), index.get_position("b"))
 
643
 
 
644
    def test_get_method(self):
 
645
        transport = MockTransport([
 
646
            _KnitIndex.HEADER,
 
647
            "a fulltext,unknown 0 1 :",
 
648
            "b unknown,line-delta 1 2 :",
 
649
            "c bad 3 4 :"
 
650
            ])
 
651
        index = self.get_knit_index(transport, "filename", "r")
 
652
 
 
653
        self.assertEqual("fulltext", index.get_method("a"))
 
654
        self.assertEqual("line-delta", index.get_method("b"))
 
655
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
656
 
 
657
    def test_get_options(self):
 
658
        transport = MockTransport([
 
659
            _KnitIndex.HEADER,
 
660
            "a opt1 0 1 :",
 
661
            "b opt2,opt3 1 2 :"
 
662
            ])
 
663
        index = self.get_knit_index(transport, "filename", "r")
 
664
 
 
665
        self.assertEqual(["opt1"], index.get_options("a"))
 
666
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
667
 
 
668
    def test_get_parents(self):
 
669
        transport = MockTransport([
 
670
            _KnitIndex.HEADER,
 
671
            "a option 0 1 :",
 
672
            "b option 1 2 0 .c :",
 
673
            "c option 1 2 1 0 .e :"
 
674
            ])
 
675
        index = self.get_knit_index(transport, "filename", "r")
 
676
 
 
677
        self.assertEqual([], index.get_parents("a"))
 
678
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
679
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
680
 
 
681
    def test_get_parents_with_ghosts(self):
 
682
        transport = MockTransport([
 
683
            _KnitIndex.HEADER,
 
684
            "a option 0 1 :",
 
685
            "b option 1 2 0 .c :",
 
686
            "c option 1 2 1 0 .e :"
 
687
            ])
 
688
        index = self.get_knit_index(transport, "filename", "r")
 
689
 
 
690
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
691
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
692
        self.assertEqual(["b", "a", "e"],
 
693
            index.get_parents_with_ghosts("c"))
 
694
 
 
695
    def test_check_versions_present(self):
 
696
        transport = MockTransport([
 
697
            _KnitIndex.HEADER,
 
698
            "a option 0 1 :",
 
699
            "b option 0 1 :"
 
700
            ])
 
701
        index = self.get_knit_index(transport, "filename", "r")
 
702
 
 
703
        check = index.check_versions_present
 
704
 
 
705
        check([])
 
706
        check(["a"])
 
707
        check(["b"])
 
708
        check(["a", "b"])
 
709
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
710
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
711
 
 
712
    def test_impossible_parent(self):
 
713
        """Test we get KnitCorrupt if the parent couldn't possibly exist."""
 
714
        transport = MockTransport([
 
715
            _KnitIndex.HEADER,
 
716
            "a option 0 1 :",
 
717
            "b option 0 1 4 :"  # We don't have a 4th record
 
718
            ])
 
719
        try:
 
720
            self.assertRaises(errors.KnitCorrupt,
 
721
                              self.get_knit_index, transport, 'filename', 'r')
 
722
        except TypeError, e:
 
723
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
724
                           ' not exceptions.IndexError')
 
725
                and sys.version_info[0:2] >= (2,5)):
 
726
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
727
                                  ' raising new style exceptions with python'
 
728
                                  ' >=2.5')
 
729
            else:
 
730
                raise
 
731
 
 
732
    def test_corrupted_parent(self):
 
733
        transport = MockTransport([
 
734
            _KnitIndex.HEADER,
 
735
            "a option 0 1 :",
 
736
            "b option 0 1 :",
 
737
            "c option 0 1 1v :", # Can't have a parent of '1v'
 
738
            ])
 
739
        try:
 
740
            self.assertRaises(errors.KnitCorrupt,
 
741
                              self.get_knit_index, transport, 'filename', 'r')
 
742
        except TypeError, e:
 
743
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
744
                           ' not exceptions.ValueError')
 
745
                and sys.version_info[0:2] >= (2,5)):
 
746
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
747
                                  ' raising new style exceptions with python'
 
748
                                  ' >=2.5')
 
749
            else:
 
750
                raise
 
751
 
 
752
    def test_corrupted_parent_in_list(self):
 
753
        transport = MockTransport([
 
754
            _KnitIndex.HEADER,
 
755
            "a option 0 1 :",
 
756
            "b option 0 1 :",
 
757
            "c option 0 1 1 v :", # Can't have a parent of 'v'
 
758
            ])
 
759
        try:
 
760
            self.assertRaises(errors.KnitCorrupt,
 
761
                              self.get_knit_index, transport, 'filename', 'r')
 
762
        except TypeError, e:
 
763
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
764
                           ' not exceptions.ValueError')
 
765
                and sys.version_info[0:2] >= (2,5)):
 
766
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
767
                                  ' raising new style exceptions with python'
 
768
                                  ' >=2.5')
 
769
            else:
 
770
                raise
 
771
 
 
772
    def test_invalid_position(self):
 
773
        transport = MockTransport([
 
774
            _KnitIndex.HEADER,
 
775
            "a option 1v 1 :",
 
776
            ])
 
777
        try:
 
778
            self.assertRaises(errors.KnitCorrupt,
 
779
                              self.get_knit_index, transport, 'filename', 'r')
 
780
        except TypeError, e:
 
781
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
782
                           ' not exceptions.ValueError')
 
783
                and sys.version_info[0:2] >= (2,5)):
 
784
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
785
                                  ' raising new style exceptions with python'
 
786
                                  ' >=2.5')
 
787
            else:
 
788
                raise
 
789
 
 
790
    def test_invalid_size(self):
 
791
        transport = MockTransport([
 
792
            _KnitIndex.HEADER,
 
793
            "a option 1 1v :",
 
794
            ])
 
795
        try:
 
796
            self.assertRaises(errors.KnitCorrupt,
 
797
                              self.get_knit_index, transport, 'filename', 'r')
 
798
        except TypeError, e:
 
799
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
800
                           ' not exceptions.ValueError')
 
801
                and sys.version_info[0:2] >= (2,5)):
 
802
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
803
                                  ' raising new style exceptions with python'
 
804
                                  ' >=2.5')
 
805
            else:
 
806
                raise
 
807
 
 
808
    def test_short_line(self):
 
809
        transport = MockTransport([
 
810
            _KnitIndex.HEADER,
 
811
            "a option 0 10  :",
 
812
            "b option 10 10 0", # This line isn't terminated, ignored
 
813
            ])
 
814
        index = self.get_knit_index(transport, "filename", "r")
 
815
        self.assertEqual(['a'], index.get_versions())
 
816
 
 
817
    def test_skip_incomplete_record(self):
 
818
        # A line with bogus data should just be skipped
 
819
        transport = MockTransport([
 
820
            _KnitIndex.HEADER,
 
821
            "a option 0 10  :",
 
822
            "b option 10 10 0", # This line isn't terminated, ignored
 
823
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
824
            ])
 
825
        index = self.get_knit_index(transport, "filename", "r")
 
826
        self.assertEqual(['a', 'c'], index.get_versions())
 
827
 
 
828
    def test_trailing_characters(self):
 
829
        # A line with bogus data should just be skipped
 
830
        transport = MockTransport([
 
831
            _KnitIndex.HEADER,
 
832
            "a option 0 10  :",
 
833
            "b option 10 10 0 :a", # This line has extra trailing characters
 
834
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
835
            ])
 
836
        index = self.get_knit_index(transport, "filename", "r")
 
837
        self.assertEqual(['a', 'c'], index.get_versions())
 
838
 
 
839
 
 
840
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
 
841
 
 
842
    _test_needs_features = [CompiledKnitFeature]
 
843
 
 
844
    def get_knit_index(self, *args, **kwargs):
 
845
        orig = knit._load_data
 
846
        def reset():
 
847
            knit._load_data = orig
 
848
        self.addCleanup(reset)
 
849
        from bzrlib._knit_load_data_c import _load_data_c
 
850
        knit._load_data = _load_data_c
 
851
        return _KnitIndex(*args, **kwargs)
 
852
 
 
853
 
 
854
 
 
855
class KnitTests(TestCaseWithTransport):
 
856
    """Class containing knit test helper routines."""
 
857
 
 
858
    def make_test_knit(self, annotate=False, delay_create=False, index=None):
 
859
        if not annotate:
 
860
            factory = KnitPlainFactory()
 
861
        else:
 
862
            factory = None
 
863
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
864
                                 factory=factory, create=True,
 
865
                                 delay_create=delay_create, index=index)
 
866
 
 
867
 
 
868
class BasicKnitTests(KnitTests):
 
869
 
 
870
    def add_stock_one_and_one_a(self, k):
 
871
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
872
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
873
 
 
874
    def test_knit_constructor(self):
 
875
        """Construct empty k"""
 
876
        self.make_test_knit()
 
877
 
 
878
    def test_make_explicit_index(self):
 
879
        """We can supply an index to use."""
 
880
        knit = KnitVersionedFile('test', get_transport('.'),
 
881
            index='strangelove')
 
882
        self.assertEqual(knit._index, 'strangelove')
 
883
 
 
884
    def test_knit_add(self):
 
885
        """Store one text in knit and retrieve"""
 
886
        k = self.make_test_knit()
 
887
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
888
        self.assertTrue(k.has_version('text-1'))
 
889
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
890
 
 
891
    def test_knit_reload(self):
 
892
        # test that the content in a reloaded knit is correct
 
893
        k = self.make_test_knit()
 
894
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
895
        del k
 
896
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
897
        self.assertTrue(k2.has_version('text-1'))
 
898
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
899
 
 
900
    def test_knit_several(self):
 
901
        """Store several texts in a knit"""
 
902
        k = self.make_test_knit()
 
903
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
904
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
905
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
906
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
907
        
 
908
    def test_repeated_add(self):
 
909
        """Knit traps attempt to replace existing version"""
 
910
        k = self.make_test_knit()
 
911
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
912
        self.assertRaises(RevisionAlreadyPresent, 
 
913
                k.add_lines,
 
914
                'text-1', [], split_lines(TEXT_1))
 
915
 
 
916
    def test_empty(self):
 
917
        k = self.make_test_knit(True)
 
918
        k.add_lines('text-1', [], [])
 
919
        self.assertEquals(k.get_lines('text-1'), [])
 
920
 
 
921
    def test_incomplete(self):
 
922
        """Test if texts without a ending line-end can be inserted and
 
923
        extracted."""
 
924
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
925
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
926
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
927
        # reopening ensures maximum room for confusion
 
928
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
929
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
930
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
931
 
 
932
    def test_delta(self):
 
933
        """Expression of knit delta as lines"""
 
934
        k = self.make_test_knit()
 
935
        td = list(line_delta(TEXT_1.splitlines(True),
 
936
                             TEXT_1A.splitlines(True)))
 
937
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
938
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
939
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
940
 
 
941
    def test_add_with_parents(self):
 
942
        """Store in knit with parents"""
 
943
        k = self.make_test_knit()
 
944
        self.add_stock_one_and_one_a(k)
 
945
        self.assertEquals(k.get_parents('text-1'), [])
 
946
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
947
 
 
948
    def test_ancestry(self):
 
949
        """Store in knit with parents"""
 
950
        k = self.make_test_knit()
 
951
        self.add_stock_one_and_one_a(k)
 
952
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
953
 
 
954
    def test_add_delta(self):
 
955
        """Store in knit with parents"""
 
956
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
957
            delta=True, create=True)
 
958
        self.add_stock_one_and_one_a(k)
 
959
        k.clear_cache()
 
960
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
961
 
 
962
    def test_add_delta_knit_graph_index(self):
 
963
        """Does adding work with a KnitGraphIndex."""
 
964
        index = InMemoryGraphIndex(2)
 
965
        knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
 
966
            deltas=True)
 
967
        k = KnitVersionedFile('test', get_transport('.'),
 
968
            delta=True, create=True, index=knit_index)
 
969
        self.add_stock_one_and_one_a(k)
 
970
        k.clear_cache()
 
971
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
972
        # check the index had the right data added.
 
973
        self.assertEqual(set([
 
974
            ('text-1', ' 0 127', ((), ())),
 
975
            ('text-1a', ' 127 140', (('text-1',), ('text-1',))),
 
976
            ]), set(index.iter_all_entries()))
 
977
        # we should not have a .kndx file
 
978
        self.assertFalse(get_transport('.').has('test.kndx'))
 
979
 
 
980
    def test_annotate(self):
 
981
        """Annotations"""
 
982
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
983
            delta=True, create=True)
 
984
        self.insert_and_test_small_annotate(k)
 
985
 
 
986
    def insert_and_test_small_annotate(self, k):
 
987
        """test annotation with k works correctly."""
 
988
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
989
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
990
 
 
991
        origins = k.annotate('text-2')
 
992
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
993
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
994
 
 
995
    def test_annotate_fulltext(self):
 
996
        """Annotations"""
 
997
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
998
            delta=False, create=True)
 
999
        self.insert_and_test_small_annotate(k)
 
1000
 
 
1001
    def test_annotate_merge_1(self):
 
1002
        k = self.make_test_knit(True)
 
1003
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
1004
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
1005
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
1006
        origins = k.annotate('text-am')
 
1007
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
1008
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1009
 
 
1010
    def test_annotate_merge_2(self):
 
1011
        k = self.make_test_knit(True)
 
1012
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1013
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1014
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
1015
        origins = k.annotate('text-am')
 
1016
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1017
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1018
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1019
 
 
1020
    def test_annotate_merge_9(self):
 
1021
        k = self.make_test_knit(True)
 
1022
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1023
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1024
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
1025
        origins = k.annotate('text-am')
 
1026
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1027
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1028
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1029
 
 
1030
    def test_annotate_merge_3(self):
 
1031
        k = self.make_test_knit(True)
 
1032
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1033
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
1034
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
1035
        origins = k.annotate('text-am')
 
1036
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1037
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1038
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1039
 
 
1040
    def test_annotate_merge_4(self):
 
1041
        k = self.make_test_knit(True)
 
1042
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1043
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1044
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
1045
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
1046
        origins = k.annotate('text-am')
 
1047
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1048
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1049
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1050
 
 
1051
    def test_annotate_merge_5(self):
 
1052
        k = self.make_test_knit(True)
 
1053
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1054
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
1055
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
1056
        k.add_lines('text-am',
 
1057
                    ['text-a1', 'text-a2', 'text-a3'],
 
1058
                    ['a\n', 'e\n', 'z\n'])
 
1059
        origins = k.annotate('text-am')
 
1060
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1061
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
1062
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
1063
 
 
1064
    def test_annotate_file_cherry_pick(self):
 
1065
        k = self.make_test_knit(True)
 
1066
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
1067
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
1068
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
1069
        origins = k.annotate('text-3')
 
1070
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1071
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
1072
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
1073
 
 
1074
    def test_knit_join(self):
 
1075
        """Store in knit with parents"""
 
1076
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1077
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
1078
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
1079
 
 
1080
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
1081
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
1082
 
 
1083
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
1084
 
 
1085
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1086
        count = k2.join(k1, version_ids=['text-m'])
 
1087
        self.assertEquals(count, 5)
 
1088
        self.assertTrue(k2.has_version('text-a'))
 
1089
        self.assertTrue(k2.has_version('text-c'))
 
1090
 
 
1091
    def test_reannotate(self):
 
1092
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
1093
                               factory=KnitAnnotateFactory(), create=True)
 
1094
        # 0
 
1095
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1096
        # 1
 
1097
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
1098
 
 
1099
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
1100
                               factory=KnitAnnotateFactory(), create=True)
 
1101
        k2.join(k1, version_ids=['text-b'])
 
1102
 
 
1103
        # 2
 
1104
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
1105
        # 2
 
1106
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
1107
        # 3
 
1108
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
1109
 
 
1110
        # test-c will have index 3
 
1111
        k1.join(k2, version_ids=['text-c'])
 
1112
 
 
1113
        lines = k1.get_lines('text-c')
 
1114
        self.assertEquals(lines, ['z\n', 'c\n'])
 
1115
 
 
1116
        origins = k1.annotate('text-c')
 
1117
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
1118
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
1119
 
 
1120
    def test_get_line_delta_texts(self):
 
1121
        """Make sure we can call get_texts on text with reused line deltas"""
 
1122
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
1123
                               factory=KnitPlainFactory(), create=True)
 
1124
        for t in range(3):
 
1125
            if t == 0:
 
1126
                parents = []
 
1127
            else:
 
1128
                parents = ['%d' % (t-1)]
 
1129
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
1130
        k1.get_texts(('%d' % t) for t in range(3))
 
1131
        
 
1132
    def test_iter_lines_reads_in_order(self):
 
1133
        t = MemoryTransport()
 
1134
        instrumented_t = TransportLogger(t)
 
1135
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
1136
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
1137
        # add texts with no required ordering
 
1138
        k1.add_lines('base', [], ['text\n'])
 
1139
        k1.add_lines('base2', [], ['text2\n'])
 
1140
        k1.clear_cache()
 
1141
        instrumented_t._calls = []
 
1142
        # request a last-first iteration
 
1143
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
1144
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
1145
        self.assertEqual(['text\n', 'text2\n'], results)
 
1146
 
 
1147
    def test_create_empty_annotated(self):
 
1148
        k1 = self.make_test_knit(True)
 
1149
        # 0
 
1150
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1151
        k2 = k1.create_empty('t', MemoryTransport())
 
1152
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
1153
        self.assertEqual(k1.delta, k2.delta)
 
1154
        # the generic test checks for empty content and file class
 
1155
 
 
1156
    def test_knit_format(self):
 
1157
        # this tests that a new knit index file has the expected content
 
1158
        # and that is writes the data we expect as records are added.
 
1159
        knit = self.make_test_knit(True)
 
1160
        # Now knit files are not created until we first add data to them
 
1161
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
1162
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1163
        self.assertFileEqual(
 
1164
            "# bzr knit index 8\n"
 
1165
            "\n"
 
1166
            "revid fulltext 0 84 .a_ghost :",
 
1167
            'test.kndx')
 
1168
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
1169
        self.assertFileEqual(
 
1170
            "# bzr knit index 8\n"
 
1171
            "\nrevid fulltext 0 84 .a_ghost :"
 
1172
            "\nrevid2 line-delta 84 82 0 :",
 
1173
            'test.kndx')
 
1174
        # we should be able to load this file again
 
1175
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1176
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1177
        # write a short write to the file and ensure that its ignored
 
1178
        indexfile = file('test.kndx', 'ab')
 
1179
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
1180
        indexfile.close()
 
1181
        # we should be able to load this file again
 
1182
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
1183
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1184
        # and add a revision with the same id the failed write had
 
1185
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
1186
        # and when reading it revid3 should now appear.
 
1187
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1188
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
1189
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
1190
 
 
1191
    def test_delay_create(self):
 
1192
        """Test that passing delay_create=True creates files late"""
 
1193
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
1194
        self.failIfExists('test.knit')
 
1195
        self.failIfExists('test.kndx')
 
1196
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1197
        self.failUnlessExists('test.knit')
 
1198
        self.assertFileEqual(
 
1199
            "# bzr knit index 8\n"
 
1200
            "\n"
 
1201
            "revid fulltext 0 84 .a_ghost :",
 
1202
            'test.kndx')
 
1203
 
 
1204
    def test_create_parent_dir(self):
 
1205
        """create_parent_dir can create knits in nonexistant dirs"""
 
1206
        # Has no effect if we don't set 'delay_create'
 
1207
        trans = get_transport('.')
 
1208
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
1209
                          trans, access_mode='w', factory=None,
 
1210
                          create=True, create_parent_dir=True)
 
1211
        # Nothing should have changed yet
 
1212
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1213
                                 factory=None, create=True,
 
1214
                                 create_parent_dir=True,
 
1215
                                 delay_create=True)
 
1216
        self.failIfExists('dir/test.knit')
 
1217
        self.failIfExists('dir/test.kndx')
 
1218
        self.failIfExists('dir')
 
1219
        knit.add_lines('revid', [], ['a\n'])
 
1220
        self.failUnlessExists('dir')
 
1221
        self.failUnlessExists('dir/test.knit')
 
1222
        self.assertFileEqual(
 
1223
            "# bzr knit index 8\n"
 
1224
            "\n"
 
1225
            "revid fulltext 0 84  :",
 
1226
            'dir/test.kndx')
 
1227
 
 
1228
    def test_create_mode_700(self):
 
1229
        trans = get_transport('.')
 
1230
        if not trans._can_roundtrip_unix_modebits():
 
1231
            # Can't roundtrip, so no need to run this test
 
1232
            return
 
1233
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1234
                                 factory=None, create=True,
 
1235
                                 create_parent_dir=True,
 
1236
                                 delay_create=True,
 
1237
                                 file_mode=0600,
 
1238
                                 dir_mode=0700)
 
1239
        knit.add_lines('revid', [], ['a\n'])
 
1240
        self.assertTransportMode(trans, 'dir', 0700)
 
1241
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
1242
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
1243
 
 
1244
    def test_create_mode_770(self):
 
1245
        trans = get_transport('.')
 
1246
        if not trans._can_roundtrip_unix_modebits():
 
1247
            # Can't roundtrip, so no need to run this test
 
1248
            return
 
1249
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1250
                                 factory=None, create=True,
 
1251
                                 create_parent_dir=True,
 
1252
                                 delay_create=True,
 
1253
                                 file_mode=0660,
 
1254
                                 dir_mode=0770)
 
1255
        knit.add_lines('revid', [], ['a\n'])
 
1256
        self.assertTransportMode(trans, 'dir', 0770)
 
1257
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
1258
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
1259
 
 
1260
    def test_create_mode_777(self):
 
1261
        trans = get_transport('.')
 
1262
        if not trans._can_roundtrip_unix_modebits():
 
1263
            # Can't roundtrip, so no need to run this test
 
1264
            return
 
1265
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1266
                                 factory=None, create=True,
 
1267
                                 create_parent_dir=True,
 
1268
                                 delay_create=True,
 
1269
                                 file_mode=0666,
 
1270
                                 dir_mode=0777)
 
1271
        knit.add_lines('revid', [], ['a\n'])
 
1272
        self.assertTransportMode(trans, 'dir', 0777)
 
1273
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
1274
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
1275
 
 
1276
    def test_plan_merge(self):
 
1277
        my_knit = self.make_test_knit(annotate=True)
 
1278
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
1279
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
1280
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
1281
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
1282
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
1283
            self.assertEqual(plan_line, expected_line)
 
1284
 
 
1285
 
 
1286
TEXT_1 = """\
 
1287
Banana cup cakes:
 
1288
 
 
1289
- bananas
 
1290
- eggs
 
1291
- broken tea cups
 
1292
"""
 
1293
 
 
1294
TEXT_1A = """\
 
1295
Banana cup cake recipe
 
1296
(serves 6)
 
1297
 
 
1298
- bananas
 
1299
- eggs
 
1300
- broken tea cups
 
1301
- self-raising flour
 
1302
"""
 
1303
 
 
1304
TEXT_1B = """\
 
1305
Banana cup cake recipe
 
1306
 
 
1307
- bananas (do not use plantains!!!)
 
1308
- broken tea cups
 
1309
- flour
 
1310
"""
 
1311
 
 
1312
delta_1_1a = """\
 
1313
0,1,2
 
1314
Banana cup cake recipe
 
1315
(serves 6)
 
1316
5,5,1
 
1317
- self-raising flour
 
1318
"""
 
1319
 
 
1320
TEXT_2 = """\
 
1321
Boeuf bourguignon
 
1322
 
 
1323
- beef
 
1324
- red wine
 
1325
- small onions
 
1326
- carrot
 
1327
- mushrooms
 
1328
"""
 
1329
 
 
1330
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
1331
new-a|(serves 6)
 
1332
unchanged|
 
1333
killed-b|- bananas
 
1334
killed-b|- eggs
 
1335
new-b|- bananas (do not use plantains!!!)
 
1336
unchanged|- broken tea cups
 
1337
new-a|- self-raising flour
 
1338
new-b|- flour
 
1339
"""
 
1340
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
1341
 
 
1342
 
 
1343
def line_delta(from_lines, to_lines):
 
1344
    """Generate line-based delta from one text to another"""
 
1345
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
1346
    for op in s.get_opcodes():
 
1347
        if op[0] == 'equal':
 
1348
            continue
 
1349
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
1350
        for i in range(op[3], op[4]):
 
1351
            yield to_lines[i]
 
1352
 
 
1353
 
 
1354
def apply_line_delta(basis_lines, delta_lines):
 
1355
    """Apply a line-based perfect diff
 
1356
    
 
1357
    basis_lines -- text to apply the patch to
 
1358
    delta_lines -- diff instructions and content
 
1359
    """
 
1360
    out = basis_lines[:]
 
1361
    i = 0
 
1362
    offset = 0
 
1363
    while i < len(delta_lines):
 
1364
        l = delta_lines[i]
 
1365
        a, b, c = map(long, l.split(','))
 
1366
        i = i + 1
 
1367
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
1368
        i = i + c
 
1369
        offset = offset + (b - a) + c
 
1370
    return out
 
1371
 
 
1372
 
 
1373
class TestWeaveToKnit(KnitTests):
 
1374
 
 
1375
    def test_weave_to_knit_matches(self):
 
1376
        # check that the WeaveToKnit is_compatible function
 
1377
        # registers True for a Weave to a Knit.
 
1378
        w = Weave()
 
1379
        k = self.make_test_knit()
 
1380
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
1381
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
1382
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
1383
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
1384
 
 
1385
 
 
1386
class TestKnitCaching(KnitTests):
 
1387
    
 
1388
    def create_knit(self, cache_add=False):
 
1389
        k = self.make_test_knit(True)
 
1390
        if cache_add:
 
1391
            k.enable_cache()
 
1392
 
 
1393
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1394
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1395
        return k
 
1396
 
 
1397
    def test_no_caching(self):
 
1398
        k = self.create_knit()
 
1399
        # Nothing should be cached without setting 'enable_cache'
 
1400
        self.assertEqual({}, k._data._cache)
 
1401
 
 
1402
    def test_cache_add_and_clear(self):
 
1403
        k = self.create_knit(True)
 
1404
 
 
1405
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
1406
 
 
1407
        k.clear_cache()
 
1408
        self.assertEqual({}, k._data._cache)
 
1409
 
 
1410
    def test_cache_data_read_raw(self):
 
1411
        k = self.create_knit()
 
1412
 
 
1413
        # Now cache and read
 
1414
        k.enable_cache()
 
1415
 
 
1416
        def read_one_raw(version):
 
1417
            pos_map = k._get_components_positions([version])
 
1418
            method, pos, size, next = pos_map[version]
 
1419
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
1420
            self.assertEqual(1, len(lst))
 
1421
            return lst[0]
 
1422
 
 
1423
        val = read_one_raw('text-1')
 
1424
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
1425
 
 
1426
        k.clear_cache()
 
1427
        # After clear, new reads are not cached
 
1428
        self.assertEqual({}, k._data._cache)
 
1429
 
 
1430
        val2 = read_one_raw('text-1')
 
1431
        self.assertEqual(val, val2)
 
1432
        self.assertEqual({}, k._data._cache)
 
1433
 
 
1434
    def test_cache_data_read(self):
 
1435
        k = self.create_knit()
 
1436
 
 
1437
        def read_one(version):
 
1438
            pos_map = k._get_components_positions([version])
 
1439
            method, pos, size, next = pos_map[version]
 
1440
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
1441
            self.assertEqual(1, len(lst))
 
1442
            return lst[0]
 
1443
 
 
1444
        # Now cache and read
 
1445
        k.enable_cache()
 
1446
 
 
1447
        val = read_one('text-2')
 
1448
        self.assertEqual(['text-2'], k._data._cache.keys())
 
1449
        self.assertEqual('text-2', val[0])
 
1450
        content, digest = k._data._parse_record('text-2',
 
1451
                                                k._data._cache['text-2'])
 
1452
        self.assertEqual(content, val[1])
 
1453
        self.assertEqual(digest, val[2])
 
1454
 
 
1455
        k.clear_cache()
 
1456
        self.assertEqual({}, k._data._cache)
 
1457
 
 
1458
        val2 = read_one('text-2')
 
1459
        self.assertEqual(val, val2)
 
1460
        self.assertEqual({}, k._data._cache)
 
1461
 
 
1462
    def test_cache_read(self):
 
1463
        k = self.create_knit()
 
1464
        k.enable_cache()
 
1465
 
 
1466
        text = k.get_text('text-1')
 
1467
        self.assertEqual(TEXT_1, text)
 
1468
        self.assertEqual(['text-1'], k._data._cache.keys())
 
1469
 
 
1470
        k.clear_cache()
 
1471
        self.assertEqual({}, k._data._cache)
 
1472
 
 
1473
        text = k.get_text('text-1')
 
1474
        self.assertEqual(TEXT_1, text)
 
1475
        self.assertEqual({}, k._data._cache)
 
1476
 
 
1477
 
 
1478
class TestKnitIndex(KnitTests):
 
1479
 
 
1480
    def test_add_versions_dictionary_compresses(self):
 
1481
        """Adding versions to the index should update the lookup dict"""
 
1482
        knit = self.make_test_knit()
 
1483
        idx = knit._index
 
1484
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1485
        self.check_file_contents('test.kndx',
 
1486
            '# bzr knit index 8\n'
 
1487
            '\n'
 
1488
            'a-1 fulltext 0 0  :'
 
1489
            )
 
1490
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
1491
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
1492
                         ])
 
1493
        self.check_file_contents('test.kndx',
 
1494
            '# bzr knit index 8\n'
 
1495
            '\n'
 
1496
            'a-1 fulltext 0 0  :\n'
 
1497
            'a-2 fulltext 0 0 0 :\n'
 
1498
            'a-3 fulltext 0 0 1 :'
 
1499
            )
 
1500
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1501
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1502
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1503
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1504
                         }, idx._cache)
 
1505
 
 
1506
    def test_add_versions_fails_clean(self):
 
1507
        """If add_versions fails in the middle, it restores a pristine state.
 
1508
 
 
1509
        Any modifications that are made to the index are reset if all versions
 
1510
        cannot be added.
 
1511
        """
 
1512
        # This cheats a little bit by passing in a generator which will
 
1513
        # raise an exception before the processing finishes
 
1514
        # Other possibilities would be to have an version with the wrong number
 
1515
        # of entries, or to make the backing transport unable to write any
 
1516
        # files.
 
1517
 
 
1518
        knit = self.make_test_knit()
 
1519
        idx = knit._index
 
1520
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1521
 
 
1522
        class StopEarly(Exception):
 
1523
            pass
 
1524
 
 
1525
        def generate_failure():
 
1526
            """Add some entries and then raise an exception"""
 
1527
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
1528
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
1529
            raise StopEarly()
 
1530
 
 
1531
        # Assert the pre-condition
 
1532
        self.assertEqual(['a-1'], idx._history)
 
1533
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1534
 
 
1535
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
1536
 
 
1537
        # And it shouldn't be modified
 
1538
        self.assertEqual(['a-1'], idx._history)
 
1539
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1540
 
 
1541
    def test_knit_index_ignores_empty_files(self):
 
1542
        # There was a race condition in older bzr, where a ^C at the right time
 
1543
        # could leave an empty .kndx file, which bzr would later claim was a
 
1544
        # corrupted file since the header was not present. In reality, the file
 
1545
        # just wasn't created, so it should be ignored.
 
1546
        t = get_transport('.')
 
1547
        t.put_bytes('test.kndx', '')
 
1548
 
 
1549
        knit = self.make_test_knit()
 
1550
 
 
1551
    def test_knit_index_checks_header(self):
 
1552
        t = get_transport('.')
 
1553
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
1554
 
 
1555
        self.assertRaises(KnitHeaderError, self.make_test_knit)
 
1556
 
 
1557
 
 
1558
class TestGraphIndexKnit(KnitTests):
 
1559
    """Tests for knits using a GraphIndex rather than a KnitIndex."""
 
1560
 
 
1561
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
1562
        builder = GraphIndexBuilder(ref_lists)
 
1563
        for node, references, value in nodes:
 
1564
            builder.add_node(node, references, value)
 
1565
        stream = builder.finish()
 
1566
        trans = self.get_transport()
 
1567
        trans.put_file(name, stream)
 
1568
        return GraphIndex(trans, name)
 
1569
 
 
1570
    def two_graph_index(self, deltas=False, catch_adds=False):
 
1571
        """Build a two-graph index.
 
1572
 
 
1573
        :param deltas: If true, use underlying indices with two node-ref
 
1574
            lists and 'parent' set to a delta-compressed against tail.
 
1575
        """
 
1576
        # build a complex graph across several indices.
 
1577
        if deltas:
 
1578
            index1 = self.make_g_index('1', 2, [
 
1579
                ('tip', 'N0 100', (['parent'], [], )),
 
1580
                ('tail', '', ([], []))])
 
1581
            index2 = self.make_g_index('2', 2, [
 
1582
                ('parent', ' 100 78', (['tail', 'ghost'], ['tail'])),
 
1583
                ('separate', '', ([], []))])
 
1584
        else:
 
1585
            index1 = self.make_g_index('1', 1, [
 
1586
                ('tip', 'N0 100', (['parent'], )),
 
1587
                ('tail', '', ([], ))])
 
1588
            index2 = self.make_g_index('2', 1, [
 
1589
                ('parent', ' 100 78', (['tail', 'ghost'], )),
 
1590
                ('separate', '', ([], ))])
 
1591
        combined_index = CombinedGraphIndex([index1, index2])
 
1592
        if catch_adds:
 
1593
            self.combined_index = combined_index
 
1594
            self.caught_entries = []
 
1595
            add_callback = self.catch_add
 
1596
        else:
 
1597
            add_callback = None
 
1598
        return KnitGraphIndex(combined_index, deltas=deltas,
 
1599
            add_callback=add_callback)
 
1600
 
 
1601
    def test_get_graph(self):
 
1602
        index = self.two_graph_index()
 
1603
        self.assertEqual(set([
 
1604
            ('tip', ('parent', )),
 
1605
            ('tail', ()),
 
1606
            ('parent', ('tail', 'ghost')),
 
1607
            ('separate', ()),
 
1608
            ]), set(index.get_graph()))
 
1609
 
 
1610
    def test_get_ancestry(self):
 
1611
        # get_ancestry is defined as eliding ghosts, not erroring.
 
1612
        index = self.two_graph_index()
 
1613
        self.assertEqual([], index.get_ancestry([]))
 
1614
        self.assertEqual(['separate'], index.get_ancestry(['separate']))
 
1615
        self.assertEqual(['tail'], index.get_ancestry(['tail']))
 
1616
        self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
 
1617
        self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
 
1618
        self.assertTrue(index.get_ancestry(['tip', 'separate']) in
 
1619
            (['tail', 'parent', 'tip', 'separate'],
 
1620
             ['separate', 'tail', 'parent', 'tip'],
 
1621
            ))
 
1622
        # and without topo_sort
 
1623
        self.assertEqual(set(['separate']),
 
1624
            set(index.get_ancestry(['separate'], topo_sorted=False)))
 
1625
        self.assertEqual(set(['tail']),
 
1626
            set(index.get_ancestry(['tail'], topo_sorted=False)))
 
1627
        self.assertEqual(set(['tail', 'parent']),
 
1628
            set(index.get_ancestry(['parent'], topo_sorted=False)))
 
1629
        self.assertEqual(set(['tail', 'parent', 'tip']),
 
1630
            set(index.get_ancestry(['tip'], topo_sorted=False)))
 
1631
        self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
 
1632
            set(index.get_ancestry(['tip', 'separate'])))
 
1633
        # asking for a ghost makes it go boom.
 
1634
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
 
1635
 
 
1636
    def test_get_ancestry_with_ghosts(self):
 
1637
        index = self.two_graph_index()
 
1638
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
1639
        self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
 
1640
        self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
 
1641
        self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
 
1642
            (['tail', 'ghost', 'parent'],
 
1643
             ['ghost', 'tail', 'parent'],
 
1644
            ))
 
1645
        self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
 
1646
            (['tail', 'ghost', 'parent', 'tip'],
 
1647
             ['ghost', 'tail', 'parent', 'tip'],
 
1648
            ))
 
1649
        self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
 
1650
            (['tail', 'ghost', 'parent', 'tip', 'separate'],
 
1651
             ['ghost', 'tail', 'parent', 'tip', 'separate'],
 
1652
             ['separate', 'tail', 'ghost', 'parent', 'tip'],
 
1653
             ['separate', 'ghost', 'tail', 'parent', 'tip'],
 
1654
            ))
 
1655
        # asking for a ghost makes it go boom.
 
1656
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
 
1657
 
 
1658
    def test_num_versions(self):
 
1659
        index = self.two_graph_index()
 
1660
        self.assertEqual(4, index.num_versions())
 
1661
 
 
1662
    def test_get_versions(self):
 
1663
        index = self.two_graph_index()
 
1664
        self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
 
1665
            set(index.get_versions()))
 
1666
 
 
1667
    def test_has_version(self):
 
1668
        index = self.two_graph_index()
 
1669
        self.assertTrue(index.has_version('tail'))
 
1670
        self.assertFalse(index.has_version('ghost'))
 
1671
 
 
1672
    def test_get_position(self):
 
1673
        index = self.two_graph_index()
 
1674
        self.assertEqual((0, 100), index.get_position('tip'))
 
1675
        self.assertEqual((100, 78), index.get_position('parent'))
 
1676
 
 
1677
    def test_get_method_deltas(self):
 
1678
        index = self.two_graph_index(deltas=True)
 
1679
        self.assertEqual('fulltext', index.get_method('tip'))
 
1680
        self.assertEqual('line-delta', index.get_method('parent'))
 
1681
 
 
1682
    def test_get_method_no_deltas(self):
 
1683
        # check that the parent-history lookup is ignored with deltas=False.
 
1684
        index = self.two_graph_index(deltas=False)
 
1685
        self.assertEqual('fulltext', index.get_method('tip'))
 
1686
        self.assertEqual('fulltext', index.get_method('parent'))
 
1687
 
 
1688
    def test_get_options_deltas(self):
 
1689
        index = self.two_graph_index(deltas=True)
 
1690
        self.assertEqual('fulltext,no-eol', index.get_options('tip'))
 
1691
        self.assertEqual('line-delta', index.get_options('parent'))
 
1692
 
 
1693
    def test_get_options_no_deltas(self):
 
1694
        # check that the parent-history lookup is ignored with deltas=False.
 
1695
        index = self.two_graph_index(deltas=False)
 
1696
        self.assertEqual('fulltext,no-eol', index.get_options('tip'))
 
1697
        self.assertEqual('fulltext', index.get_options('parent'))
 
1698
 
 
1699
    def test_get_parents(self):
 
1700
        # get_parents ignores ghosts
 
1701
        index = self.two_graph_index()
 
1702
        self.assertEqual(('tail', ), index.get_parents('parent'))
 
1703
        # and errors on ghosts.
 
1704
        self.assertRaises(errors.RevisionNotPresent,
 
1705
            index.get_parents, 'ghost')
 
1706
 
 
1707
    def test_get_parents_with_ghosts(self):
 
1708
        index = self.two_graph_index()
 
1709
        self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
 
1710
        # and errors on ghosts.
 
1711
        self.assertRaises(errors.RevisionNotPresent,
 
1712
            index.get_parents_with_ghosts, 'ghost')
 
1713
 
 
1714
    def test_check_versions_present(self):
 
1715
        # ghosts should not be considered present
 
1716
        index = self.two_graph_index()
 
1717
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
1718
            ['ghost'])
 
1719
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
1720
            ['tail', 'ghost'])
 
1721
        index.check_versions_present(['tail', 'separate'])
 
1722
 
 
1723
    def catch_add(self, entries):
 
1724
        self.caught_entries.append(entries)
 
1725
 
 
1726
    def test_add_no_callback_errors(self):
 
1727
        index = self.two_graph_index()
 
1728
        self.assertRaises(errors.ReadOnlyError, index.add_version,
 
1729
            'new', 'fulltext,no-eol', 50, 60, ['separate'])
 
1730
 
 
1731
    def test_add_version_smoke(self):
 
1732
        index = self.two_graph_index(catch_adds=True)
 
1733
        index.add_version('new', 'fulltext,no-eol', 50, 60, ['separate'])
 
1734
        self.assertEqual([[('new', 'N50 60', (('separate',),))]],
 
1735
            self.caught_entries)
 
1736
 
 
1737
    def test_add_version_delta_not_delta_index(self):
 
1738
        index = self.two_graph_index(catch_adds=True)
 
1739
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1740
            'new', 'no-eol,line-delta', 0, 100, ['parent'])
 
1741
        self.assertEqual([], self.caught_entries)
 
1742
 
 
1743
    def test_add_version_same_dup(self):
 
1744
        index = self.two_graph_index(catch_adds=True)
 
1745
        # options can be spelt two different ways
 
1746
        index.add_version('tip', 'fulltext,no-eol', 0, 100, ['parent'])
 
1747
        index.add_version('tip', 'no-eol,fulltext', 0, 100, ['parent'])
 
1748
        # but neither should have added data.
 
1749
        self.assertEqual([[], []], self.caught_entries)
 
1750
        
 
1751
    def test_add_version_different_dup(self):
 
1752
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
1753
        # change options
 
1754
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1755
            'tip', 'no-eol,line-delta', 0, 100, ['parent'])
 
1756
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1757
            'tip', 'line-delta,no-eol', 0, 100, ['parent'])
 
1758
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1759
            'tip', 'fulltext', 0, 100, ['parent'])
 
1760
        # position/length
 
1761
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1762
            'tip', 'fulltext,no-eol', 50, 100, ['parent'])
 
1763
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1764
            'tip', 'fulltext,no-eol', 0, 1000, ['parent'])
 
1765
        # parents
 
1766
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1767
            'tip', 'fulltext,no-eol', 0, 100, [])
 
1768
        self.assertEqual([], self.caught_entries)
 
1769
        
 
1770
    def test_add_versions_nodeltas(self):
 
1771
        index = self.two_graph_index(catch_adds=True)
 
1772
        index.add_versions([
 
1773
                ('new', 'fulltext,no-eol', 50, 60, ['separate']),
 
1774
                ('new2', 'fulltext', 0, 6, ['new']),
 
1775
                ])
 
1776
        self.assertEqual([('new', 'N50 60', (('separate',),)),
 
1777
            ('new2', ' 0 6', (('new',),))],
 
1778
            sorted(self.caught_entries[0]))
 
1779
        self.assertEqual(1, len(self.caught_entries))
 
1780
 
 
1781
    def test_add_versions_deltas(self):
 
1782
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
1783
        index.add_versions([
 
1784
                ('new', 'fulltext,no-eol', 50, 60, ['separate']),
 
1785
                ('new2', 'line-delta', 0, 6, ['new']),
 
1786
                ])
 
1787
        self.assertEqual([('new', 'N50 60', (('separate',), ())),
 
1788
            ('new2', ' 0 6', (('new',), ('new',), ))],
 
1789
            sorted(self.caught_entries[0]))
 
1790
        self.assertEqual(1, len(self.caught_entries))
 
1791
 
 
1792
    def test_add_versions_delta_not_delta_index(self):
 
1793
        index = self.two_graph_index(catch_adds=True)
 
1794
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1795
            [('new', 'no-eol,line-delta', 0, 100, ['parent'])])
 
1796
        self.assertEqual([], self.caught_entries)
 
1797
 
 
1798
    def test_add_versions_same_dup(self):
 
1799
        index = self.two_graph_index(catch_adds=True)
 
1800
        # options can be spelt two different ways
 
1801
        index.add_versions([('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
 
1802
        index.add_versions([('tip', 'no-eol,fulltext', 0, 100, ['parent'])])
 
1803
        # but neither should have added data.
 
1804
        self.assertEqual([[], []], self.caught_entries)
 
1805
        
 
1806
    def test_add_versions_different_dup(self):
 
1807
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
1808
        # change options
 
1809
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1810
            [('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
 
1811
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1812
            [('tip', 'line-delta,no-eol', 0, 100, ['parent'])])
 
1813
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1814
            [('tip', 'fulltext', 0, 100, ['parent'])])
 
1815
        # position/length
 
1816
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1817
            [('tip', 'fulltext,no-eol', 50, 100, ['parent'])])
 
1818
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1819
            [('tip', 'fulltext,no-eol', 0, 1000, ['parent'])])
 
1820
        # parents
 
1821
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1822
            [('tip', 'fulltext,no-eol', 0, 100, [])])
 
1823
        # change options in the second record
 
1824
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1825
            [('tip', 'fulltext,no-eol', 0, 100, ['parent']),
 
1826
             ('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
 
1827
        self.assertEqual([], self.caught_entries)
 
1828
 
 
1829
    def test_iter_parents(self):
 
1830
        index1 = self.make_g_index('1', 1, [
 
1831
        # no parents
 
1832
            ('r0', 'N0 100', ([], )),
 
1833
        # 1 parent
 
1834
            ('r1', '', (['r0'], ))])
 
1835
        index2 = self.make_g_index('2', 1, [
 
1836
        # 2 parents
 
1837
            ('r2', 'N0 100', (['r1', 'r0'], )),
 
1838
            ])
 
1839
        combined_index = CombinedGraphIndex([index1, index2])
 
1840
        index = KnitGraphIndex(combined_index)
 
1841
        # XXX TODO a ghost
 
1842
        # cases: each sample data individually:
 
1843
        self.assertEqual(set([('r0', ())]),
 
1844
            set(index.iter_parents(['r0'])))
 
1845
        self.assertEqual(set([('r1', ('r0', ))]),
 
1846
            set(index.iter_parents(['r1'])))
 
1847
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
1848
            set(index.iter_parents(['r2'])))
 
1849
        # no nodes returned for a missing node
 
1850
        self.assertEqual(set(),
 
1851
            set(index.iter_parents(['missing'])))
 
1852
        # 1 node returned with missing nodes skipped
 
1853
        self.assertEqual(set([('r1', ('r0', ))]),
 
1854
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
 
1855
        # 2 nodes returned
 
1856
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
1857
            set(index.iter_parents(['r0', 'r1'])))
 
1858
        # 2 nodes returned, missing skipped
 
1859
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
1860
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
1861
 
 
1862
 
 
1863
class TestNoParentsGraphIndexKnit(KnitTests):
 
1864
    """Tests for knits using KnitGraphIndex with no parents."""
 
1865
 
 
1866
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
1867
        builder = GraphIndexBuilder(ref_lists)
 
1868
        for node, references in nodes:
 
1869
            builder.add_node(node, references)
 
1870
        stream = builder.finish()
 
1871
        trans = self.get_transport()
 
1872
        trans.put_file(name, stream)
 
1873
        return GraphIndex(trans, name)
 
1874
 
 
1875
    def test_parents_deltas_incompatible(self):
 
1876
        index = CombinedGraphIndex([])
 
1877
        self.assertRaises(errors.KnitError, KnitGraphIndex, index,
 
1878
            deltas=True, parents=False)
 
1879
 
 
1880
    def two_graph_index(self, catch_adds=False):
 
1881
        """Build a two-graph index.
 
1882
 
 
1883
        :param deltas: If true, use underlying indices with two node-ref
 
1884
            lists and 'parent' set to a delta-compressed against tail.
 
1885
        """
 
1886
        # put several versions in the index.
 
1887
        index1 = self.make_g_index('1', 0, [
 
1888
            ('tip', 'N0 100'),
 
1889
            ('tail', '')])
 
1890
        index2 = self.make_g_index('2', 0, [
 
1891
            ('parent', ' 100 78'),
 
1892
            ('separate', '')])
 
1893
        combined_index = CombinedGraphIndex([index1, index2])
 
1894
        if catch_adds:
 
1895
            self.combined_index = combined_index
 
1896
            self.caught_entries = []
 
1897
            add_callback = self.catch_add
 
1898
        else:
 
1899
            add_callback = None
 
1900
        return KnitGraphIndex(combined_index, parents=False,
 
1901
            add_callback=add_callback)
 
1902
 
 
1903
    def test_get_graph(self):
 
1904
        index = self.two_graph_index()
 
1905
        self.assertEqual(set([
 
1906
            ('tip', ()),
 
1907
            ('tail', ()),
 
1908
            ('parent', ()),
 
1909
            ('separate', ()),
 
1910
            ]), set(index.get_graph()))
 
1911
 
 
1912
    def test_get_ancestry(self):
 
1913
        # with no parents, ancestry is always just the key.
 
1914
        index = self.two_graph_index()
 
1915
        self.assertEqual([], index.get_ancestry([]))
 
1916
        self.assertEqual(['separate'], index.get_ancestry(['separate']))
 
1917
        self.assertEqual(['tail'], index.get_ancestry(['tail']))
 
1918
        self.assertEqual(['parent'], index.get_ancestry(['parent']))
 
1919
        self.assertEqual(['tip'], index.get_ancestry(['tip']))
 
1920
        self.assertTrue(index.get_ancestry(['tip', 'separate']) in
 
1921
            (['tip', 'separate'],
 
1922
             ['separate', 'tip'],
 
1923
            ))
 
1924
        # asking for a ghost makes it go boom.
 
1925
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
 
1926
 
 
1927
    def test_get_ancestry_with_ghosts(self):
 
1928
        index = self.two_graph_index()
 
1929
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
1930
        self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
 
1931
        self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
 
1932
        self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
 
1933
        self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
 
1934
        self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
 
1935
            (['tip', 'separate'],
 
1936
             ['separate', 'tip'],
 
1937
            ))
 
1938
        # asking for a ghost makes it go boom.
 
1939
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
 
1940
 
 
1941
    def test_num_versions(self):
 
1942
        index = self.two_graph_index()
 
1943
        self.assertEqual(4, index.num_versions())
 
1944
 
 
1945
    def test_get_versions(self):
 
1946
        index = self.two_graph_index()
 
1947
        self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
 
1948
            set(index.get_versions()))
 
1949
 
 
1950
    def test_has_version(self):
 
1951
        index = self.two_graph_index()
 
1952
        self.assertTrue(index.has_version('tail'))
 
1953
        self.assertFalse(index.has_version('ghost'))
 
1954
 
 
1955
    def test_get_position(self):
 
1956
        index = self.two_graph_index()
 
1957
        self.assertEqual((0, 100), index.get_position('tip'))
 
1958
        self.assertEqual((100, 78), index.get_position('parent'))
 
1959
 
 
1960
    def test_get_method(self):
 
1961
        index = self.two_graph_index()
 
1962
        self.assertEqual('fulltext', index.get_method('tip'))
 
1963
        self.assertEqual('fulltext', index.get_options('parent'))
 
1964
 
 
1965
    def test_get_options(self):
 
1966
        index = self.two_graph_index()
 
1967
        self.assertEqual('fulltext,no-eol', index.get_options('tip'))
 
1968
        self.assertEqual('fulltext', index.get_options('parent'))
 
1969
 
 
1970
    def test_get_parents(self):
 
1971
        index = self.two_graph_index()
 
1972
        self.assertEqual((), index.get_parents('parent'))
 
1973
        # and errors on ghosts.
 
1974
        self.assertRaises(errors.RevisionNotPresent,
 
1975
            index.get_parents, 'ghost')
 
1976
 
 
1977
    def test_get_parents_with_ghosts(self):
 
1978
        index = self.two_graph_index()
 
1979
        self.assertEqual((), index.get_parents_with_ghosts('parent'))
 
1980
        # and errors on ghosts.
 
1981
        self.assertRaises(errors.RevisionNotPresent,
 
1982
            index.get_parents_with_ghosts, 'ghost')
 
1983
 
 
1984
    def test_check_versions_present(self):
 
1985
        index = self.two_graph_index()
 
1986
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
1987
            ['missing'])
 
1988
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
1989
            ['tail', 'missing'])
 
1990
        index.check_versions_present(['tail', 'separate'])
 
1991
 
 
1992
    def catch_add(self, entries):
 
1993
        self.caught_entries.append(entries)
 
1994
 
 
1995
    def test_add_no_callback_errors(self):
 
1996
        index = self.two_graph_index()
 
1997
        self.assertRaises(errors.ReadOnlyError, index.add_version,
 
1998
            'new', 'fulltext,no-eol', 50, 60, ['separate'])
 
1999
 
 
2000
    def test_add_version_smoke(self):
 
2001
        index = self.two_graph_index(catch_adds=True)
 
2002
        index.add_version('new', 'fulltext,no-eol', 50, 60, [])
 
2003
        self.assertEqual([[('new', 'N50 60')]],
 
2004
            self.caught_entries)
 
2005
 
 
2006
    def test_add_version_delta_not_delta_index(self):
 
2007
        index = self.two_graph_index(catch_adds=True)
 
2008
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2009
            'new', 'no-eol,line-delta', 0, 100, [])
 
2010
        self.assertEqual([], self.caught_entries)
 
2011
 
 
2012
    def test_add_version_same_dup(self):
 
2013
        index = self.two_graph_index(catch_adds=True)
 
2014
        # options can be spelt two different ways
 
2015
        index.add_version('tip', 'fulltext,no-eol', 0, 100, [])
 
2016
        index.add_version('tip', 'no-eol,fulltext', 0, 100, [])
 
2017
        # but neither should have added data.
 
2018
        self.assertEqual([[], []], self.caught_entries)
 
2019
        
 
2020
    def test_add_version_different_dup(self):
 
2021
        index = self.two_graph_index(catch_adds=True)
 
2022
        # change options
 
2023
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2024
            'tip', 'no-eol,line-delta', 0, 100, [])
 
2025
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2026
            'tip', 'line-delta,no-eol', 0, 100, [])
 
2027
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2028
            'tip', 'fulltext', 0, 100, [])
 
2029
        # position/length
 
2030
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2031
            'tip', 'fulltext,no-eol', 50, 100, [])
 
2032
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2033
            'tip', 'fulltext,no-eol', 0, 1000, [])
 
2034
        # parents
 
2035
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2036
            'tip', 'fulltext,no-eol', 0, 100, ['parent'])
 
2037
        self.assertEqual([], self.caught_entries)
 
2038
        
 
2039
    def test_add_versions(self):
 
2040
        index = self.two_graph_index(catch_adds=True)
 
2041
        index.add_versions([
 
2042
                ('new', 'fulltext,no-eol', 50, 60, []),
 
2043
                ('new2', 'fulltext', 0, 6, []),
 
2044
                ])
 
2045
        self.assertEqual([('new', 'N50 60'), ('new2', ' 0 6')],
 
2046
            sorted(self.caught_entries[0]))
 
2047
        self.assertEqual(1, len(self.caught_entries))
 
2048
 
 
2049
    def test_add_versions_delta_not_delta_index(self):
 
2050
        index = self.two_graph_index(catch_adds=True)
 
2051
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2052
            [('new', 'no-eol,line-delta', 0, 100, ['parent'])])
 
2053
        self.assertEqual([], self.caught_entries)
 
2054
 
 
2055
    def test_add_versions_parents_not_parents_index(self):
 
2056
        index = self.two_graph_index(catch_adds=True)
 
2057
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2058
            [('new', 'no-eol,fulltext', 0, 100, ['parent'])])
 
2059
        self.assertEqual([], self.caught_entries)
 
2060
 
 
2061
    def test_add_versions_same_dup(self):
 
2062
        index = self.two_graph_index(catch_adds=True)
 
2063
        # options can be spelt two different ways
 
2064
        index.add_versions([('tip', 'fulltext,no-eol', 0, 100, [])])
 
2065
        index.add_versions([('tip', 'no-eol,fulltext', 0, 100, [])])
 
2066
        # but neither should have added data.
 
2067
        self.assertEqual([[], []], self.caught_entries)
 
2068
        
 
2069
    def test_add_versions_different_dup(self):
 
2070
        index = self.two_graph_index(catch_adds=True)
 
2071
        # change options
 
2072
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2073
            [('tip', 'no-eol,line-delta', 0, 100, [])])
 
2074
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2075
            [('tip', 'line-delta,no-eol', 0, 100, [])])
 
2076
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2077
            [('tip', 'fulltext', 0, 100, [])])
 
2078
        # position/length
 
2079
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2080
            [('tip', 'fulltext,no-eol', 50, 100, [])])
 
2081
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2082
            [('tip', 'fulltext,no-eol', 0, 1000, [])])
 
2083
        # parents
 
2084
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2085
            [('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
 
2086
        # change options in the second record
 
2087
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2088
            [('tip', 'fulltext,no-eol', 0, 100, []),
 
2089
             ('tip', 'no-eol,line-delta', 0, 100, [])])
 
2090
        self.assertEqual([], self.caught_entries)
 
2091
 
 
2092
    def test_iter_parents(self):
 
2093
        index = self.two_graph_index()
 
2094
        self.assertEqual(set([
 
2095
            ('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
 
2096
            ]),
 
2097
            set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
 
2098
        self.assertEqual(set([('tip', ())]),
 
2099
            set(index.iter_parents(['tip'])))
 
2100
        self.assertEqual(set(),
 
2101
            set(index.iter_parents([])))