~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-08-01 17:14:51 UTC
  • mfrom: (2662.1.1 bzr.easy_install)
  • Revision ID: pqm@pqm.ubuntu.com-20070801171451-en3tds1hzlru2j83
allow ``easy_install bzr`` runs without fatal errors. (#125521, bialix,
 r=mbp)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
from cStringIO import StringIO
 
20
import difflib
 
21
import gzip
 
22
import sha
 
23
import sys
 
24
 
 
25
from bzrlib import (
 
26
    errors,
 
27
    generate_ids,
 
28
    knit,
 
29
    )
 
30
from bzrlib.errors import (
 
31
    RevisionAlreadyPresent,
 
32
    KnitHeaderError,
 
33
    RevisionNotPresent,
 
34
    NoSuchFile,
 
35
    )
 
36
from bzrlib.index import *
 
37
from bzrlib.knit import (
 
38
    KnitContent,
 
39
    KnitGraphIndex,
 
40
    KnitVersionedFile,
 
41
    KnitPlainFactory,
 
42
    KnitAnnotateFactory,
 
43
    _KnitData,
 
44
    _KnitIndex,
 
45
    WeaveToKnit,
 
46
    KnitSequenceMatcher,
 
47
    )
 
48
from bzrlib.osutils import split_lines
 
49
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
 
50
from bzrlib.transport import TransportLogger, get_transport
 
51
from bzrlib.transport.memory import MemoryTransport
 
52
from bzrlib.weave import Weave
 
53
 
 
54
 
 
55
class _CompiledKnitFeature(Feature):
 
56
 
 
57
    def _probe(self):
 
58
        try:
 
59
            import bzrlib._knit_load_data_c
 
60
        except ImportError:
 
61
            return False
 
62
        return True
 
63
 
 
64
    def feature_name(self):
 
65
        return 'bzrlib._knit_load_data_c'
 
66
 
 
67
CompiledKnitFeature = _CompiledKnitFeature()
 
68
 
 
69
 
 
70
class KnitContentTests(TestCase):
 
71
 
 
72
    def test_constructor(self):
 
73
        content = KnitContent([])
 
74
 
 
75
    def test_text(self):
 
76
        content = KnitContent([])
 
77
        self.assertEqual(content.text(), [])
 
78
 
 
79
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
80
        self.assertEqual(content.text(), ["text1", "text2"])
 
81
 
 
82
    def test_annotate(self):
 
83
        content = KnitContent([])
 
84
        self.assertEqual(content.annotate(), [])
 
85
 
 
86
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
87
        self.assertEqual(content.annotate(),
 
88
            [("origin1", "text1"), ("origin2", "text2")])
 
89
 
 
90
    def test_annotate_iter(self):
 
91
        content = KnitContent([])
 
92
        it = content.annotate_iter()
 
93
        self.assertRaises(StopIteration, it.next)
 
94
 
 
95
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
96
        it = content.annotate_iter()
 
97
        self.assertEqual(it.next(), ("origin1", "text1"))
 
98
        self.assertEqual(it.next(), ("origin2", "text2"))
 
99
        self.assertRaises(StopIteration, it.next)
 
100
 
 
101
    def test_copy(self):
 
102
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
103
        copy = content.copy()
 
104
        self.assertIsInstance(copy, KnitContent)
 
105
        self.assertEqual(copy.annotate(),
 
106
            [("origin1", "text1"), ("origin2", "text2")])
 
107
 
 
108
    def test_line_delta(self):
 
109
        content1 = KnitContent([("", "a"), ("", "b")])
 
110
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
111
        self.assertEqual(content1.line_delta(content2),
 
112
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
113
 
 
114
    def test_line_delta_iter(self):
 
115
        content1 = KnitContent([("", "a"), ("", "b")])
 
116
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
117
        it = content1.line_delta_iter(content2)
 
118
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
119
        self.assertRaises(StopIteration, it.next)
 
120
 
 
121
 
 
122
class MockTransport(object):
 
123
 
 
124
    def __init__(self, file_lines=None):
 
125
        self.file_lines = file_lines
 
126
        self.calls = []
 
127
        # We have no base directory for the MockTransport
 
128
        self.base = ''
 
129
 
 
130
    def get(self, filename):
 
131
        if self.file_lines is None:
 
132
            raise NoSuchFile(filename)
 
133
        else:
 
134
            return StringIO("\n".join(self.file_lines))
 
135
 
 
136
    def readv(self, relpath, offsets):
 
137
        fp = self.get(relpath)
 
138
        for offset, size in offsets:
 
139
            fp.seek(offset)
 
140
            yield offset, fp.read(size)
 
141
 
 
142
    def __getattr__(self, name):
 
143
        def queue_call(*args, **kwargs):
 
144
            self.calls.append((name, args, kwargs))
 
145
        return queue_call
 
146
 
 
147
 
 
148
class LowLevelKnitDataTests(TestCase):
 
149
 
 
150
    def create_gz_content(self, text):
 
151
        sio = StringIO()
 
152
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
153
        gz_file.write(text)
 
154
        gz_file.close()
 
155
        return sio.getvalue()
 
156
 
 
157
    def test_valid_knit_data(self):
 
158
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
159
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
160
                                        'foo\n'
 
161
                                        'bar\n'
 
162
                                        'end rev-id-1\n'
 
163
                                        % (sha1sum,))
 
164
        transport = MockTransport([gz_txt])
 
165
        data = _KnitData(transport, 'filename', mode='r')
 
166
        records = [('rev-id-1', 0, len(gz_txt))]
 
167
 
 
168
        contents = data.read_records(records)
 
169
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
170
 
 
171
        raw_contents = list(data.read_records_iter_raw(records))
 
172
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
173
 
 
174
    def test_not_enough_lines(self):
 
175
        sha1sum = sha.new('foo\n').hexdigest()
 
176
        # record says 2 lines data says 1
 
177
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
178
                                        'foo\n'
 
179
                                        'end rev-id-1\n'
 
180
                                        % (sha1sum,))
 
181
        transport = MockTransport([gz_txt])
 
182
        data = _KnitData(transport, 'filename', mode='r')
 
183
        records = [('rev-id-1', 0, len(gz_txt))]
 
184
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
185
 
 
186
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
187
        raw_contents = list(data.read_records_iter_raw(records))
 
188
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
189
 
 
190
    def test_too_many_lines(self):
 
191
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
192
        # record says 1 lines data says 2
 
193
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
194
                                        'foo\n'
 
195
                                        'bar\n'
 
196
                                        'end rev-id-1\n'
 
197
                                        % (sha1sum,))
 
198
        transport = MockTransport([gz_txt])
 
199
        data = _KnitData(transport, 'filename', mode='r')
 
200
        records = [('rev-id-1', 0, len(gz_txt))]
 
201
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
202
 
 
203
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
204
        raw_contents = list(data.read_records_iter_raw(records))
 
205
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
206
 
 
207
    def test_mismatched_version_id(self):
 
208
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
209
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
210
                                        'foo\n'
 
211
                                        'bar\n'
 
212
                                        'end rev-id-1\n'
 
213
                                        % (sha1sum,))
 
214
        transport = MockTransport([gz_txt])
 
215
        data = _KnitData(transport, 'filename', mode='r')
 
216
        # We are asking for rev-id-2, but the data is rev-id-1
 
217
        records = [('rev-id-2', 0, len(gz_txt))]
 
218
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
219
 
 
220
        # read_records_iter_raw will notice if we request the wrong version.
 
221
        self.assertRaises(errors.KnitCorrupt, list,
 
222
                          data.read_records_iter_raw(records))
 
223
 
 
224
    def test_uncompressed_data(self):
 
225
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
226
        txt = ('version rev-id-1 2 %s\n'
 
227
               'foo\n'
 
228
               'bar\n'
 
229
               'end rev-id-1\n'
 
230
               % (sha1sum,))
 
231
        transport = MockTransport([txt])
 
232
        data = _KnitData(transport, 'filename', mode='r')
 
233
        records = [('rev-id-1', 0, len(txt))]
 
234
 
 
235
        # We don't have valid gzip data ==> corrupt
 
236
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
237
 
 
238
        # read_records_iter_raw will notice the bad data
 
239
        self.assertRaises(errors.KnitCorrupt, list,
 
240
                          data.read_records_iter_raw(records))
 
241
 
 
242
    def test_corrupted_data(self):
 
243
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
244
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
245
                                        'foo\n'
 
246
                                        'bar\n'
 
247
                                        'end rev-id-1\n'
 
248
                                        % (sha1sum,))
 
249
        # Change 2 bytes in the middle to \xff
 
250
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
251
        transport = MockTransport([gz_txt])
 
252
        data = _KnitData(transport, 'filename', mode='r')
 
253
        records = [('rev-id-1', 0, len(gz_txt))]
 
254
 
 
255
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
256
 
 
257
        # read_records_iter_raw will notice if we request the wrong version.
 
258
        self.assertRaises(errors.KnitCorrupt, list,
 
259
                          data.read_records_iter_raw(records))
 
260
 
 
261
 
 
262
class LowLevelKnitIndexTests(TestCase):
 
263
 
 
264
    def get_knit_index(self, *args, **kwargs):
 
265
        orig = knit._load_data
 
266
        def reset():
 
267
            knit._load_data = orig
 
268
        self.addCleanup(reset)
 
269
        from bzrlib._knit_load_data_py import _load_data_py
 
270
        knit._load_data = _load_data_py
 
271
        return _KnitIndex(*args, **kwargs)
 
272
 
 
273
    def test_no_such_file(self):
 
274
        transport = MockTransport()
 
275
 
 
276
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
277
                          transport, "filename", "r")
 
278
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
279
                          transport, "filename", "w", create=False)
 
280
 
 
281
    def test_create_file(self):
 
282
        transport = MockTransport()
 
283
 
 
284
        index = self.get_knit_index(transport, "filename", "w",
 
285
            file_mode="wb", create=True)
 
286
        self.assertEqual(
 
287
                ("put_bytes_non_atomic",
 
288
                    ("filename", index.HEADER), {"mode": "wb"}),
 
289
                transport.calls.pop(0))
 
290
 
 
291
    def test_delay_create_file(self):
 
292
        transport = MockTransport()
 
293
 
 
294
        index = self.get_knit_index(transport, "filename", "w",
 
295
            create=True, file_mode="wb", create_parent_dir=True,
 
296
            delay_create=True, dir_mode=0777)
 
297
        self.assertEqual([], transport.calls)
 
298
 
 
299
        index.add_versions([])
 
300
        name, (filename, f), kwargs = transport.calls.pop(0)
 
301
        self.assertEqual("put_file_non_atomic", name)
 
302
        self.assertEqual(
 
303
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
304
            kwargs)
 
305
        self.assertEqual("filename", filename)
 
306
        self.assertEqual(index.HEADER, f.read())
 
307
 
 
308
        index.add_versions([])
 
309
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
310
            transport.calls.pop(0))
 
311
 
 
312
    def test_read_utf8_version_id(self):
 
313
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
314
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
315
        transport = MockTransport([
 
316
            _KnitIndex.HEADER,
 
317
            '%s option 0 1 :' % (utf8_revision_id,)
 
318
            ])
 
319
        index = self.get_knit_index(transport, "filename", "r")
 
320
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
321
        # Unicode revision_ids.
 
322
        self.assertTrue(index.has_version(utf8_revision_id))
 
323
        self.assertFalse(index.has_version(unicode_revision_id))
 
324
 
 
325
    def test_read_utf8_parents(self):
 
326
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
327
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
328
        transport = MockTransport([
 
329
            _KnitIndex.HEADER,
 
330
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
331
            ])
 
332
        index = self.get_knit_index(transport, "filename", "r")
 
333
        self.assertEqual([utf8_revision_id],
 
334
            index.get_parents_with_ghosts("version"))
 
335
 
 
336
    def test_read_ignore_corrupted_lines(self):
 
337
        transport = MockTransport([
 
338
            _KnitIndex.HEADER,
 
339
            "corrupted",
 
340
            "corrupted options 0 1 .b .c ",
 
341
            "version options 0 1 :"
 
342
            ])
 
343
        index = self.get_knit_index(transport, "filename", "r")
 
344
        self.assertEqual(1, index.num_versions())
 
345
        self.assertTrue(index.has_version("version"))
 
346
 
 
347
    def test_read_corrupted_header(self):
 
348
        transport = MockTransport(['not a bzr knit index header\n'])
 
349
        self.assertRaises(KnitHeaderError,
 
350
            self.get_knit_index, transport, "filename", "r")
 
351
 
 
352
    def test_read_duplicate_entries(self):
 
353
        transport = MockTransport([
 
354
            _KnitIndex.HEADER,
 
355
            "parent options 0 1 :",
 
356
            "version options1 0 1 0 :",
 
357
            "version options2 1 2 .other :",
 
358
            "version options3 3 4 0 .other :"
 
359
            ])
 
360
        index = self.get_knit_index(transport, "filename", "r")
 
361
        self.assertEqual(2, index.num_versions())
 
362
        # check that the index used is the first one written. (Specific
 
363
        # to KnitIndex style indices.
 
364
        self.assertEqual("1", index._version_list_to_index(["version"]))
 
365
        self.assertEqual((3, 4), index.get_position("version"))
 
366
        self.assertEqual(["options3"], index.get_options("version"))
 
367
        self.assertEqual(["parent", "other"],
 
368
            index.get_parents_with_ghosts("version"))
 
369
 
 
370
    def test_read_compressed_parents(self):
 
371
        transport = MockTransport([
 
372
            _KnitIndex.HEADER,
 
373
            "a option 0 1 :",
 
374
            "b option 0 1 0 :",
 
375
            "c option 0 1 1 0 :",
 
376
            ])
 
377
        index = self.get_knit_index(transport, "filename", "r")
 
378
        self.assertEqual(["a"], index.get_parents("b"))
 
379
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
380
 
 
381
    def test_write_utf8_version_id(self):
 
382
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
383
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
384
        transport = MockTransport([
 
385
            _KnitIndex.HEADER
 
386
            ])
 
387
        index = self.get_knit_index(transport, "filename", "r")
 
388
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
 
389
        self.assertEqual(("append_bytes", ("filename",
 
390
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
391
            {}),
 
392
            transport.calls.pop(0))
 
393
 
 
394
    def test_write_utf8_parents(self):
 
395
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
396
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
397
        transport = MockTransport([
 
398
            _KnitIndex.HEADER
 
399
            ])
 
400
        index = self.get_knit_index(transport, "filename", "r")
 
401
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
 
402
        self.assertEqual(("append_bytes", ("filename",
 
403
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
404
            {}),
 
405
            transport.calls.pop(0))
 
406
 
 
407
    def test_get_graph(self):
 
408
        transport = MockTransport()
 
409
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
410
        self.assertEqual([], index.get_graph())
 
411
 
 
412
        index.add_version("a", ["option"], 0, 1, ["b"])
 
413
        self.assertEqual([("a", ["b"])], index.get_graph())
 
414
 
 
415
        index.add_version("c", ["option"], 0, 1, ["d"])
 
416
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
417
            sorted(index.get_graph()))
 
418
 
 
419
    def test_get_ancestry(self):
 
420
        transport = MockTransport([
 
421
            _KnitIndex.HEADER,
 
422
            "a option 0 1 :",
 
423
            "b option 0 1 0 .e :",
 
424
            "c option 0 1 1 0 :",
 
425
            "d option 0 1 2 .f :"
 
426
            ])
 
427
        index = self.get_knit_index(transport, "filename", "r")
 
428
 
 
429
        self.assertEqual([], index.get_ancestry([]))
 
430
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
431
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
432
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
433
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
434
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
435
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
436
 
 
437
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
438
 
 
439
    def test_get_ancestry_with_ghosts(self):
 
440
        transport = MockTransport([
 
441
            _KnitIndex.HEADER,
 
442
            "a option 0 1 :",
 
443
            "b option 0 1 0 .e :",
 
444
            "c option 0 1 0 .f .g :",
 
445
            "d option 0 1 2 .h .j .k :"
 
446
            ])
 
447
        index = self.get_knit_index(transport, "filename", "r")
 
448
 
 
449
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
450
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
451
        self.assertEqual(["a", "e", "b"],
 
452
            index.get_ancestry_with_ghosts(["b"]))
 
453
        self.assertEqual(["a", "g", "f", "c"],
 
454
            index.get_ancestry_with_ghosts(["c"]))
 
455
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
456
            index.get_ancestry_with_ghosts(["d"]))
 
457
        self.assertEqual(["a", "e", "b"],
 
458
            index.get_ancestry_with_ghosts(["a", "b"]))
 
459
        self.assertEqual(["a", "g", "f", "c"],
 
460
            index.get_ancestry_with_ghosts(["a", "c"]))
 
461
        self.assertEqual(
 
462
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
463
            index.get_ancestry_with_ghosts(["b", "d"]))
 
464
 
 
465
        self.assertRaises(RevisionNotPresent,
 
466
            index.get_ancestry_with_ghosts, ["e"])
 
467
 
 
468
    def test_iter_parents(self):
 
469
        transport = MockTransport()
 
470
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
471
        # no parents
 
472
        index.add_version('r0', ['option'], 0, 1, [])
 
473
        # 1 parent
 
474
        index.add_version('r1', ['option'], 0, 1, ['r0'])
 
475
        # 2 parents
 
476
        index.add_version('r2', ['option'], 0, 1, ['r1', 'r0'])
 
477
        # XXX TODO a ghost
 
478
        # cases: each sample data individually:
 
479
        self.assertEqual(set([('r0', ())]),
 
480
            set(index.iter_parents(['r0'])))
 
481
        self.assertEqual(set([('r1', ('r0', ))]),
 
482
            set(index.iter_parents(['r1'])))
 
483
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
484
            set(index.iter_parents(['r2'])))
 
485
        # no nodes returned for a missing node
 
486
        self.assertEqual(set(),
 
487
            set(index.iter_parents(['missing'])))
 
488
        # 1 node returned with missing nodes skipped
 
489
        self.assertEqual(set([('r1', ('r0', ))]),
 
490
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
 
491
        # 2 nodes returned
 
492
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
493
            set(index.iter_parents(['r0', 'r1'])))
 
494
        # 2 nodes returned, missing skipped
 
495
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
496
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
497
 
 
498
    def test_num_versions(self):
 
499
        transport = MockTransport([
 
500
            _KnitIndex.HEADER
 
501
            ])
 
502
        index = self.get_knit_index(transport, "filename", "r")
 
503
 
 
504
        self.assertEqual(0, index.num_versions())
 
505
        self.assertEqual(0, len(index))
 
506
 
 
507
        index.add_version("a", ["option"], 0, 1, [])
 
508
        self.assertEqual(1, index.num_versions())
 
509
        self.assertEqual(1, len(index))
 
510
 
 
511
        index.add_version("a", ["option2"], 1, 2, [])
 
512
        self.assertEqual(1, index.num_versions())
 
513
        self.assertEqual(1, len(index))
 
514
 
 
515
        index.add_version("b", ["option"], 0, 1, [])
 
516
        self.assertEqual(2, index.num_versions())
 
517
        self.assertEqual(2, len(index))
 
518
 
 
519
    def test_get_versions(self):
 
520
        transport = MockTransport([
 
521
            _KnitIndex.HEADER
 
522
            ])
 
523
        index = self.get_knit_index(transport, "filename", "r")
 
524
 
 
525
        self.assertEqual([], index.get_versions())
 
526
 
 
527
        index.add_version("a", ["option"], 0, 1, [])
 
528
        self.assertEqual(["a"], index.get_versions())
 
529
 
 
530
        index.add_version("a", ["option"], 0, 1, [])
 
531
        self.assertEqual(["a"], index.get_versions())
 
532
 
 
533
        index.add_version("b", ["option"], 0, 1, [])
 
534
        self.assertEqual(["a", "b"], index.get_versions())
 
535
 
 
536
    def test_add_version(self):
 
537
        transport = MockTransport([
 
538
            _KnitIndex.HEADER
 
539
            ])
 
540
        index = self.get_knit_index(transport, "filename", "r")
 
541
 
 
542
        index.add_version("a", ["option"], 0, 1, ["b"])
 
543
        self.assertEqual(("append_bytes",
 
544
            ("filename", "\na option 0 1 .b :"),
 
545
            {}), transport.calls.pop(0))
 
546
        self.assertTrue(index.has_version("a"))
 
547
        self.assertEqual(1, index.num_versions())
 
548
        self.assertEqual((0, 1), index.get_position("a"))
 
549
        self.assertEqual(["option"], index.get_options("a"))
 
550
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
551
 
 
552
        index.add_version("a", ["opt"], 1, 2, ["c"])
 
553
        self.assertEqual(("append_bytes",
 
554
            ("filename", "\na opt 1 2 .c :"),
 
555
            {}), transport.calls.pop(0))
 
556
        self.assertTrue(index.has_version("a"))
 
557
        self.assertEqual(1, index.num_versions())
 
558
        self.assertEqual((1, 2), index.get_position("a"))
 
559
        self.assertEqual(["opt"], index.get_options("a"))
 
560
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
561
 
 
562
        index.add_version("b", ["option"], 2, 3, ["a"])
 
563
        self.assertEqual(("append_bytes",
 
564
            ("filename", "\nb option 2 3 0 :"),
 
565
            {}), transport.calls.pop(0))
 
566
        self.assertTrue(index.has_version("b"))
 
567
        self.assertEqual(2, index.num_versions())
 
568
        self.assertEqual((2, 3), index.get_position("b"))
 
569
        self.assertEqual(["option"], index.get_options("b"))
 
570
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
571
 
 
572
    def test_add_versions(self):
 
573
        transport = MockTransport([
 
574
            _KnitIndex.HEADER
 
575
            ])
 
576
        index = self.get_knit_index(transport, "filename", "r")
 
577
 
 
578
        index.add_versions([
 
579
            ("a", ["option"], 0, 1, ["b"]),
 
580
            ("a", ["opt"], 1, 2, ["c"]),
 
581
            ("b", ["option"], 2, 3, ["a"])
 
582
            ])
 
583
        self.assertEqual(("append_bytes", ("filename",
 
584
            "\na option 0 1 .b :"
 
585
            "\na opt 1 2 .c :"
 
586
            "\nb option 2 3 0 :"
 
587
            ), {}), transport.calls.pop(0))
 
588
        self.assertTrue(index.has_version("a"))
 
589
        self.assertTrue(index.has_version("b"))
 
590
        self.assertEqual(2, index.num_versions())
 
591
        self.assertEqual((1, 2), index.get_position("a"))
 
592
        self.assertEqual((2, 3), index.get_position("b"))
 
593
        self.assertEqual(["opt"], index.get_options("a"))
 
594
        self.assertEqual(["option"], index.get_options("b"))
 
595
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
596
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
597
 
 
598
    def test_delay_create_and_add_versions(self):
 
599
        transport = MockTransport()
 
600
 
 
601
        index = self.get_knit_index(transport, "filename", "w",
 
602
            create=True, file_mode="wb", create_parent_dir=True,
 
603
            delay_create=True, dir_mode=0777)
 
604
        self.assertEqual([], transport.calls)
 
605
 
 
606
        index.add_versions([
 
607
            ("a", ["option"], 0, 1, ["b"]),
 
608
            ("a", ["opt"], 1, 2, ["c"]),
 
609
            ("b", ["option"], 2, 3, ["a"])
 
610
            ])
 
611
        name, (filename, f), kwargs = transport.calls.pop(0)
 
612
        self.assertEqual("put_file_non_atomic", name)
 
613
        self.assertEqual(
 
614
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
615
            kwargs)
 
616
        self.assertEqual("filename", filename)
 
617
        self.assertEqual(
 
618
            index.HEADER +
 
619
            "\na option 0 1 .b :"
 
620
            "\na opt 1 2 .c :"
 
621
            "\nb option 2 3 0 :",
 
622
            f.read())
 
623
 
 
624
    def test_has_version(self):
 
625
        transport = MockTransport([
 
626
            _KnitIndex.HEADER,
 
627
            "a option 0 1 :"
 
628
            ])
 
629
        index = self.get_knit_index(transport, "filename", "r")
 
630
 
 
631
        self.assertTrue(index.has_version("a"))
 
632
        self.assertFalse(index.has_version("b"))
 
633
 
 
634
    def test_get_position(self):
 
635
        transport = MockTransport([
 
636
            _KnitIndex.HEADER,
 
637
            "a option 0 1 :",
 
638
            "b option 1 2 :"
 
639
            ])
 
640
        index = self.get_knit_index(transport, "filename", "r")
 
641
 
 
642
        self.assertEqual((0, 1), index.get_position("a"))
 
643
        self.assertEqual((1, 2), index.get_position("b"))
 
644
 
 
645
    def test_get_method(self):
 
646
        transport = MockTransport([
 
647
            _KnitIndex.HEADER,
 
648
            "a fulltext,unknown 0 1 :",
 
649
            "b unknown,line-delta 1 2 :",
 
650
            "c bad 3 4 :"
 
651
            ])
 
652
        index = self.get_knit_index(transport, "filename", "r")
 
653
 
 
654
        self.assertEqual("fulltext", index.get_method("a"))
 
655
        self.assertEqual("line-delta", index.get_method("b"))
 
656
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
657
 
 
658
    def test_get_options(self):
 
659
        transport = MockTransport([
 
660
            _KnitIndex.HEADER,
 
661
            "a opt1 0 1 :",
 
662
            "b opt2,opt3 1 2 :"
 
663
            ])
 
664
        index = self.get_knit_index(transport, "filename", "r")
 
665
 
 
666
        self.assertEqual(["opt1"], index.get_options("a"))
 
667
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
668
 
 
669
    def test_get_parents(self):
 
670
        transport = MockTransport([
 
671
            _KnitIndex.HEADER,
 
672
            "a option 0 1 :",
 
673
            "b option 1 2 0 .c :",
 
674
            "c option 1 2 1 0 .e :"
 
675
            ])
 
676
        index = self.get_knit_index(transport, "filename", "r")
 
677
 
 
678
        self.assertEqual([], index.get_parents("a"))
 
679
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
680
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
681
 
 
682
    def test_get_parents_with_ghosts(self):
 
683
        transport = MockTransport([
 
684
            _KnitIndex.HEADER,
 
685
            "a option 0 1 :",
 
686
            "b option 1 2 0 .c :",
 
687
            "c option 1 2 1 0 .e :"
 
688
            ])
 
689
        index = self.get_knit_index(transport, "filename", "r")
 
690
 
 
691
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
692
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
693
        self.assertEqual(["b", "a", "e"],
 
694
            index.get_parents_with_ghosts("c"))
 
695
 
 
696
    def test_check_versions_present(self):
 
697
        transport = MockTransport([
 
698
            _KnitIndex.HEADER,
 
699
            "a option 0 1 :",
 
700
            "b option 0 1 :"
 
701
            ])
 
702
        index = self.get_knit_index(transport, "filename", "r")
 
703
 
 
704
        check = index.check_versions_present
 
705
 
 
706
        check([])
 
707
        check(["a"])
 
708
        check(["b"])
 
709
        check(["a", "b"])
 
710
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
711
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
712
 
 
713
    def test_impossible_parent(self):
 
714
        """Test we get KnitCorrupt if the parent couldn't possibly exist."""
 
715
        transport = MockTransport([
 
716
            _KnitIndex.HEADER,
 
717
            "a option 0 1 :",
 
718
            "b option 0 1 4 :"  # We don't have a 4th record
 
719
            ])
 
720
        try:
 
721
            self.assertRaises(errors.KnitCorrupt,
 
722
                              self.get_knit_index, transport, 'filename', 'r')
 
723
        except TypeError, e:
 
724
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
725
                           ' not exceptions.IndexError')
 
726
                and sys.version_info[0:2] >= (2,5)):
 
727
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
728
                                  ' raising new style exceptions with python'
 
729
                                  ' >=2.5')
 
730
            else:
 
731
                raise
 
732
 
 
733
    def test_corrupted_parent(self):
 
734
        transport = MockTransport([
 
735
            _KnitIndex.HEADER,
 
736
            "a option 0 1 :",
 
737
            "b option 0 1 :",
 
738
            "c option 0 1 1v :", # Can't have a parent of '1v'
 
739
            ])
 
740
        try:
 
741
            self.assertRaises(errors.KnitCorrupt,
 
742
                              self.get_knit_index, transport, 'filename', 'r')
 
743
        except TypeError, e:
 
744
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
745
                           ' not exceptions.ValueError')
 
746
                and sys.version_info[0:2] >= (2,5)):
 
747
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
748
                                  ' raising new style exceptions with python'
 
749
                                  ' >=2.5')
 
750
            else:
 
751
                raise
 
752
 
 
753
    def test_corrupted_parent_in_list(self):
 
754
        transport = MockTransport([
 
755
            _KnitIndex.HEADER,
 
756
            "a option 0 1 :",
 
757
            "b option 0 1 :",
 
758
            "c option 0 1 1 v :", # Can't have a parent of 'v'
 
759
            ])
 
760
        try:
 
761
            self.assertRaises(errors.KnitCorrupt,
 
762
                              self.get_knit_index, transport, 'filename', 'r')
 
763
        except TypeError, e:
 
764
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
765
                           ' not exceptions.ValueError')
 
766
                and sys.version_info[0:2] >= (2,5)):
 
767
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
768
                                  ' raising new style exceptions with python'
 
769
                                  ' >=2.5')
 
770
            else:
 
771
                raise
 
772
 
 
773
    def test_invalid_position(self):
 
774
        transport = MockTransport([
 
775
            _KnitIndex.HEADER,
 
776
            "a option 1v 1 :",
 
777
            ])
 
778
        try:
 
779
            self.assertRaises(errors.KnitCorrupt,
 
780
                              self.get_knit_index, transport, 'filename', 'r')
 
781
        except TypeError, e:
 
782
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
783
                           ' not exceptions.ValueError')
 
784
                and sys.version_info[0:2] >= (2,5)):
 
785
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
786
                                  ' raising new style exceptions with python'
 
787
                                  ' >=2.5')
 
788
            else:
 
789
                raise
 
790
 
 
791
    def test_invalid_size(self):
 
792
        transport = MockTransport([
 
793
            _KnitIndex.HEADER,
 
794
            "a option 1 1v :",
 
795
            ])
 
796
        try:
 
797
            self.assertRaises(errors.KnitCorrupt,
 
798
                              self.get_knit_index, transport, 'filename', 'r')
 
799
        except TypeError, e:
 
800
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
801
                           ' not exceptions.ValueError')
 
802
                and sys.version_info[0:2] >= (2,5)):
 
803
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
804
                                  ' raising new style exceptions with python'
 
805
                                  ' >=2.5')
 
806
            else:
 
807
                raise
 
808
 
 
809
    def test_short_line(self):
 
810
        transport = MockTransport([
 
811
            _KnitIndex.HEADER,
 
812
            "a option 0 10  :",
 
813
            "b option 10 10 0", # This line isn't terminated, ignored
 
814
            ])
 
815
        index = self.get_knit_index(transport, "filename", "r")
 
816
        self.assertEqual(['a'], index.get_versions())
 
817
 
 
818
    def test_skip_incomplete_record(self):
 
819
        # A line with bogus data should just be skipped
 
820
        transport = MockTransport([
 
821
            _KnitIndex.HEADER,
 
822
            "a option 0 10  :",
 
823
            "b option 10 10 0", # This line isn't terminated, ignored
 
824
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
825
            ])
 
826
        index = self.get_knit_index(transport, "filename", "r")
 
827
        self.assertEqual(['a', 'c'], index.get_versions())
 
828
 
 
829
    def test_trailing_characters(self):
 
830
        # A line with bogus data should just be skipped
 
831
        transport = MockTransport([
 
832
            _KnitIndex.HEADER,
 
833
            "a option 0 10  :",
 
834
            "b option 10 10 0 :a", # This line has extra trailing characters
 
835
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
836
            ])
 
837
        index = self.get_knit_index(transport, "filename", "r")
 
838
        self.assertEqual(['a', 'c'], index.get_versions())
 
839
 
 
840
 
 
841
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
 
842
 
 
843
    _test_needs_features = [CompiledKnitFeature]
 
844
 
 
845
    def get_knit_index(self, *args, **kwargs):
 
846
        orig = knit._load_data
 
847
        def reset():
 
848
            knit._load_data = orig
 
849
        self.addCleanup(reset)
 
850
        from bzrlib._knit_load_data_c import _load_data_c
 
851
        knit._load_data = _load_data_c
 
852
        return _KnitIndex(*args, **kwargs)
 
853
 
 
854
 
 
855
 
 
856
class KnitTests(TestCaseWithTransport):
 
857
    """Class containing knit test helper routines."""
 
858
 
 
859
    def make_test_knit(self, annotate=False, delay_create=False, index=None):
 
860
        if not annotate:
 
861
            factory = KnitPlainFactory()
 
862
        else:
 
863
            factory = None
 
864
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
865
                                 factory=factory, create=True,
 
866
                                 delay_create=delay_create, index=index)
 
867
 
 
868
 
 
869
class BasicKnitTests(KnitTests):
 
870
 
 
871
    def add_stock_one_and_one_a(self, k):
 
872
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
873
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
874
 
 
875
    def test_knit_constructor(self):
 
876
        """Construct empty k"""
 
877
        self.make_test_knit()
 
878
 
 
879
    def test_make_explicit_index(self):
 
880
        """We can supply an index to use."""
 
881
        knit = KnitVersionedFile('test', get_transport('.'),
 
882
            index='strangelove')
 
883
        self.assertEqual(knit._index, 'strangelove')
 
884
 
 
885
    def test_knit_add(self):
 
886
        """Store one text in knit and retrieve"""
 
887
        k = self.make_test_knit()
 
888
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
889
        self.assertTrue(k.has_version('text-1'))
 
890
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
891
 
 
892
    def test_knit_reload(self):
 
893
        # test that the content in a reloaded knit is correct
 
894
        k = self.make_test_knit()
 
895
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
896
        del k
 
897
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
898
        self.assertTrue(k2.has_version('text-1'))
 
899
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
900
 
 
901
    def test_knit_several(self):
 
902
        """Store several texts in a knit"""
 
903
        k = self.make_test_knit()
 
904
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
905
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
906
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
907
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
908
        
 
909
    def test_repeated_add(self):
 
910
        """Knit traps attempt to replace existing version"""
 
911
        k = self.make_test_knit()
 
912
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
913
        self.assertRaises(RevisionAlreadyPresent, 
 
914
                k.add_lines,
 
915
                'text-1', [], split_lines(TEXT_1))
 
916
 
 
917
    def test_empty(self):
 
918
        k = self.make_test_knit(True)
 
919
        k.add_lines('text-1', [], [])
 
920
        self.assertEquals(k.get_lines('text-1'), [])
 
921
 
 
922
    def test_incomplete(self):
 
923
        """Test if texts without a ending line-end can be inserted and
 
924
        extracted."""
 
925
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
926
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
927
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
928
        # reopening ensures maximum room for confusion
 
929
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
930
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
931
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
932
 
 
933
    def test_delta(self):
 
934
        """Expression of knit delta as lines"""
 
935
        k = self.make_test_knit()
 
936
        KnitContent
 
937
        td = list(line_delta(TEXT_1.splitlines(True),
 
938
                             TEXT_1A.splitlines(True)))
 
939
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
940
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
941
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
942
 
 
943
    def assertDerivedBlocksEqual(self, source, target, noeol=False):
 
944
        """Assert that the derived matching blocks match real output"""
 
945
        source_lines = source.splitlines(True)
 
946
        target_lines = target.splitlines(True)
 
947
        def nl(line):
 
948
            if noeol and not line.endswith('\n'):
 
949
                return line + '\n'
 
950
            else:
 
951
                return line
 
952
        source_content = KnitContent([(None, nl(l)) for l in source_lines])
 
953
        target_content = KnitContent([(None, nl(l)) for l in target_lines])
 
954
        line_delta = source_content.line_delta(target_content)
 
955
        delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
 
956
            source_lines, target_lines))
 
957
        matcher = KnitSequenceMatcher(None, source_lines, target_lines)
 
958
        matcher_blocks = list(list(matcher.get_matching_blocks()))
 
959
        self.assertEqual(matcher_blocks, delta_blocks)
 
960
 
 
961
    def test_get_line_delta_blocks(self):
 
962
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
 
963
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
 
964
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
 
965
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
 
966
        self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
 
967
        self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
 
968
        self.assertDerivedBlocksEqual(TEXT_1A, '')
 
969
        self.assertDerivedBlocksEqual('', TEXT_1A)
 
970
        self.assertDerivedBlocksEqual('', '')
 
971
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
 
972
 
 
973
    def test_get_line_delta_blocks_noeol(self):
 
974
        """Handle historical knit deltas safely
 
975
 
 
976
        Some existing knit deltas don't consider the last line to differ
 
977
        when the only difference whether it has a final newline.
 
978
 
 
979
        New knit deltas appear to always consider the last line to differ
 
980
        in this case.
 
981
        """
 
982
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
 
983
        self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
 
984
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
 
985
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
 
986
 
 
987
    def test_add_with_parents(self):
 
988
        """Store in knit with parents"""
 
989
        k = self.make_test_knit()
 
990
        self.add_stock_one_and_one_a(k)
 
991
        self.assertEquals(k.get_parents('text-1'), [])
 
992
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
993
 
 
994
    def test_ancestry(self):
 
995
        """Store in knit with parents"""
 
996
        k = self.make_test_knit()
 
997
        self.add_stock_one_and_one_a(k)
 
998
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
999
 
 
1000
    def test_add_delta(self):
 
1001
        """Store in knit with parents"""
 
1002
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
1003
            delta=True, create=True)
 
1004
        self.add_stock_one_and_one_a(k)
 
1005
        k.clear_cache()
 
1006
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
1007
 
 
1008
    def test_add_delta_knit_graph_index(self):
 
1009
        """Does adding work with a KnitGraphIndex."""
 
1010
        index = InMemoryGraphIndex(2)
 
1011
        knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
 
1012
            deltas=True)
 
1013
        k = KnitVersionedFile('test', get_transport('.'),
 
1014
            delta=True, create=True, index=knit_index)
 
1015
        self.add_stock_one_and_one_a(k)
 
1016
        k.clear_cache()
 
1017
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
1018
        # check the index had the right data added.
 
1019
        self.assertEqual(set([
 
1020
            (('text-1', ), ' 0 127', ((), ())),
 
1021
            (('text-1a', ), ' 127 140', ((('text-1', ),), (('text-1', ),))),
 
1022
            ]), set(index.iter_all_entries()))
 
1023
        # we should not have a .kndx file
 
1024
        self.assertFalse(get_transport('.').has('test.kndx'))
 
1025
 
 
1026
    def test_annotate(self):
 
1027
        """Annotations"""
 
1028
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
1029
            delta=True, create=True)
 
1030
        self.insert_and_test_small_annotate(k)
 
1031
 
 
1032
    def insert_and_test_small_annotate(self, k):
 
1033
        """test annotation with k works correctly."""
 
1034
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
1035
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
1036
 
 
1037
        origins = k.annotate('text-2')
 
1038
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1039
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
1040
 
 
1041
    def test_annotate_fulltext(self):
 
1042
        """Annotations"""
 
1043
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
1044
            delta=False, create=True)
 
1045
        self.insert_and_test_small_annotate(k)
 
1046
 
 
1047
    def test_annotate_merge_1(self):
 
1048
        k = self.make_test_knit(True)
 
1049
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
1050
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
1051
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
1052
        origins = k.annotate('text-am')
 
1053
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
1054
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1055
 
 
1056
    def test_annotate_merge_2(self):
 
1057
        k = self.make_test_knit(True)
 
1058
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1059
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1060
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
1061
        origins = k.annotate('text-am')
 
1062
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1063
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1064
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1065
 
 
1066
    def test_annotate_merge_9(self):
 
1067
        k = self.make_test_knit(True)
 
1068
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1069
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1070
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
1071
        origins = k.annotate('text-am')
 
1072
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1073
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1074
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1075
 
 
1076
    def test_annotate_merge_3(self):
 
1077
        k = self.make_test_knit(True)
 
1078
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1079
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
1080
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
1081
        origins = k.annotate('text-am')
 
1082
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1083
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1084
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1085
 
 
1086
    def test_annotate_merge_4(self):
 
1087
        k = self.make_test_knit(True)
 
1088
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1089
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1090
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
1091
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
1092
        origins = k.annotate('text-am')
 
1093
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1094
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1095
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1096
 
 
1097
    def test_annotate_merge_5(self):
 
1098
        k = self.make_test_knit(True)
 
1099
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1100
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
1101
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
1102
        k.add_lines('text-am',
 
1103
                    ['text-a1', 'text-a2', 'text-a3'],
 
1104
                    ['a\n', 'e\n', 'z\n'])
 
1105
        origins = k.annotate('text-am')
 
1106
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1107
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
1108
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
1109
 
 
1110
    def test_annotate_file_cherry_pick(self):
 
1111
        k = self.make_test_knit(True)
 
1112
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
1113
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
1114
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
1115
        origins = k.annotate('text-3')
 
1116
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1117
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
1118
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
1119
 
 
1120
    def test_knit_join(self):
 
1121
        """Store in knit with parents"""
 
1122
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1123
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
1124
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
1125
 
 
1126
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
1127
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
1128
 
 
1129
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
1130
 
 
1131
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1132
        count = k2.join(k1, version_ids=['text-m'])
 
1133
        self.assertEquals(count, 5)
 
1134
        self.assertTrue(k2.has_version('text-a'))
 
1135
        self.assertTrue(k2.has_version('text-c'))
 
1136
 
 
1137
    def test_reannotate(self):
 
1138
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
1139
                               factory=KnitAnnotateFactory(), create=True)
 
1140
        # 0
 
1141
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1142
        # 1
 
1143
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
1144
 
 
1145
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
1146
                               factory=KnitAnnotateFactory(), create=True)
 
1147
        k2.join(k1, version_ids=['text-b'])
 
1148
 
 
1149
        # 2
 
1150
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
1151
        # 2
 
1152
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
1153
        # 3
 
1154
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
1155
 
 
1156
        # test-c will have index 3
 
1157
        k1.join(k2, version_ids=['text-c'])
 
1158
 
 
1159
        lines = k1.get_lines('text-c')
 
1160
        self.assertEquals(lines, ['z\n', 'c\n'])
 
1161
 
 
1162
        origins = k1.annotate('text-c')
 
1163
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
1164
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
1165
 
 
1166
    def test_get_line_delta_texts(self):
 
1167
        """Make sure we can call get_texts on text with reused line deltas"""
 
1168
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
1169
                               factory=KnitPlainFactory(), create=True)
 
1170
        for t in range(3):
 
1171
            if t == 0:
 
1172
                parents = []
 
1173
            else:
 
1174
                parents = ['%d' % (t-1)]
 
1175
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
1176
        k1.get_texts(('%d' % t) for t in range(3))
 
1177
        
 
1178
    def test_iter_lines_reads_in_order(self):
 
1179
        t = MemoryTransport()
 
1180
        instrumented_t = TransportLogger(t)
 
1181
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
1182
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
1183
        # add texts with no required ordering
 
1184
        k1.add_lines('base', [], ['text\n'])
 
1185
        k1.add_lines('base2', [], ['text2\n'])
 
1186
        k1.clear_cache()
 
1187
        instrumented_t._calls = []
 
1188
        # request a last-first iteration
 
1189
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
1190
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
1191
        self.assertEqual(['text\n', 'text2\n'], results)
 
1192
 
 
1193
    def test_create_empty_annotated(self):
 
1194
        k1 = self.make_test_knit(True)
 
1195
        # 0
 
1196
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1197
        k2 = k1.create_empty('t', MemoryTransport())
 
1198
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
1199
        self.assertEqual(k1.delta, k2.delta)
 
1200
        # the generic test checks for empty content and file class
 
1201
 
 
1202
    def test_knit_format(self):
 
1203
        # this tests that a new knit index file has the expected content
 
1204
        # and that is writes the data we expect as records are added.
 
1205
        knit = self.make_test_knit(True)
 
1206
        # Now knit files are not created until we first add data to them
 
1207
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
1208
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1209
        self.assertFileEqual(
 
1210
            "# bzr knit index 8\n"
 
1211
            "\n"
 
1212
            "revid fulltext 0 84 .a_ghost :",
 
1213
            'test.kndx')
 
1214
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
1215
        self.assertFileEqual(
 
1216
            "# bzr knit index 8\n"
 
1217
            "\nrevid fulltext 0 84 .a_ghost :"
 
1218
            "\nrevid2 line-delta 84 82 0 :",
 
1219
            'test.kndx')
 
1220
        # we should be able to load this file again
 
1221
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1222
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1223
        # write a short write to the file and ensure that its ignored
 
1224
        indexfile = file('test.kndx', 'ab')
 
1225
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
1226
        indexfile.close()
 
1227
        # we should be able to load this file again
 
1228
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
1229
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1230
        # and add a revision with the same id the failed write had
 
1231
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
1232
        # and when reading it revid3 should now appear.
 
1233
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1234
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
1235
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
1236
 
 
1237
    def test_delay_create(self):
 
1238
        """Test that passing delay_create=True creates files late"""
 
1239
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
1240
        self.failIfExists('test.knit')
 
1241
        self.failIfExists('test.kndx')
 
1242
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1243
        self.failUnlessExists('test.knit')
 
1244
        self.assertFileEqual(
 
1245
            "# bzr knit index 8\n"
 
1246
            "\n"
 
1247
            "revid fulltext 0 84 .a_ghost :",
 
1248
            'test.kndx')
 
1249
 
 
1250
    def test_create_parent_dir(self):
 
1251
        """create_parent_dir can create knits in nonexistant dirs"""
 
1252
        # Has no effect if we don't set 'delay_create'
 
1253
        trans = get_transport('.')
 
1254
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
1255
                          trans, access_mode='w', factory=None,
 
1256
                          create=True, create_parent_dir=True)
 
1257
        # Nothing should have changed yet
 
1258
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1259
                                 factory=None, create=True,
 
1260
                                 create_parent_dir=True,
 
1261
                                 delay_create=True)
 
1262
        self.failIfExists('dir/test.knit')
 
1263
        self.failIfExists('dir/test.kndx')
 
1264
        self.failIfExists('dir')
 
1265
        knit.add_lines('revid', [], ['a\n'])
 
1266
        self.failUnlessExists('dir')
 
1267
        self.failUnlessExists('dir/test.knit')
 
1268
        self.assertFileEqual(
 
1269
            "# bzr knit index 8\n"
 
1270
            "\n"
 
1271
            "revid fulltext 0 84  :",
 
1272
            'dir/test.kndx')
 
1273
 
 
1274
    def test_create_mode_700(self):
 
1275
        trans = get_transport('.')
 
1276
        if not trans._can_roundtrip_unix_modebits():
 
1277
            # Can't roundtrip, so no need to run this test
 
1278
            return
 
1279
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1280
                                 factory=None, create=True,
 
1281
                                 create_parent_dir=True,
 
1282
                                 delay_create=True,
 
1283
                                 file_mode=0600,
 
1284
                                 dir_mode=0700)
 
1285
        knit.add_lines('revid', [], ['a\n'])
 
1286
        self.assertTransportMode(trans, 'dir', 0700)
 
1287
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
1288
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
1289
 
 
1290
    def test_create_mode_770(self):
 
1291
        trans = get_transport('.')
 
1292
        if not trans._can_roundtrip_unix_modebits():
 
1293
            # Can't roundtrip, so no need to run this test
 
1294
            return
 
1295
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1296
                                 factory=None, create=True,
 
1297
                                 create_parent_dir=True,
 
1298
                                 delay_create=True,
 
1299
                                 file_mode=0660,
 
1300
                                 dir_mode=0770)
 
1301
        knit.add_lines('revid', [], ['a\n'])
 
1302
        self.assertTransportMode(trans, 'dir', 0770)
 
1303
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
1304
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
1305
 
 
1306
    def test_create_mode_777(self):
 
1307
        trans = get_transport('.')
 
1308
        if not trans._can_roundtrip_unix_modebits():
 
1309
            # Can't roundtrip, so no need to run this test
 
1310
            return
 
1311
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1312
                                 factory=None, create=True,
 
1313
                                 create_parent_dir=True,
 
1314
                                 delay_create=True,
 
1315
                                 file_mode=0666,
 
1316
                                 dir_mode=0777)
 
1317
        knit.add_lines('revid', [], ['a\n'])
 
1318
        self.assertTransportMode(trans, 'dir', 0777)
 
1319
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
1320
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
1321
 
 
1322
    def test_plan_merge(self):
 
1323
        my_knit = self.make_test_knit(annotate=True)
 
1324
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
1325
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
1326
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
1327
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
1328
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
1329
            self.assertEqual(plan_line, expected_line)
 
1330
 
 
1331
 
 
1332
TEXT_1 = """\
 
1333
Banana cup cakes:
 
1334
 
 
1335
- bananas
 
1336
- eggs
 
1337
- broken tea cups
 
1338
"""
 
1339
 
 
1340
TEXT_1A = """\
 
1341
Banana cup cake recipe
 
1342
(serves 6)
 
1343
 
 
1344
- bananas
 
1345
- eggs
 
1346
- broken tea cups
 
1347
- self-raising flour
 
1348
"""
 
1349
 
 
1350
TEXT_1B = """\
 
1351
Banana cup cake recipe
 
1352
 
 
1353
- bananas (do not use plantains!!!)
 
1354
- broken tea cups
 
1355
- flour
 
1356
"""
 
1357
 
 
1358
delta_1_1a = """\
 
1359
0,1,2
 
1360
Banana cup cake recipe
 
1361
(serves 6)
 
1362
5,5,1
 
1363
- self-raising flour
 
1364
"""
 
1365
 
 
1366
TEXT_2 = """\
 
1367
Boeuf bourguignon
 
1368
 
 
1369
- beef
 
1370
- red wine
 
1371
- small onions
 
1372
- carrot
 
1373
- mushrooms
 
1374
"""
 
1375
 
 
1376
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
1377
new-a|(serves 6)
 
1378
unchanged|
 
1379
killed-b|- bananas
 
1380
killed-b|- eggs
 
1381
new-b|- bananas (do not use plantains!!!)
 
1382
unchanged|- broken tea cups
 
1383
new-a|- self-raising flour
 
1384
new-b|- flour
 
1385
"""
 
1386
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
1387
 
 
1388
 
 
1389
def line_delta(from_lines, to_lines):
 
1390
    """Generate line-based delta from one text to another"""
 
1391
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
1392
    for op in s.get_opcodes():
 
1393
        if op[0] == 'equal':
 
1394
            continue
 
1395
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
1396
        for i in range(op[3], op[4]):
 
1397
            yield to_lines[i]
 
1398
 
 
1399
 
 
1400
def apply_line_delta(basis_lines, delta_lines):
 
1401
    """Apply a line-based perfect diff
 
1402
    
 
1403
    basis_lines -- text to apply the patch to
 
1404
    delta_lines -- diff instructions and content
 
1405
    """
 
1406
    out = basis_lines[:]
 
1407
    i = 0
 
1408
    offset = 0
 
1409
    while i < len(delta_lines):
 
1410
        l = delta_lines[i]
 
1411
        a, b, c = map(long, l.split(','))
 
1412
        i = i + 1
 
1413
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
1414
        i = i + c
 
1415
        offset = offset + (b - a) + c
 
1416
    return out
 
1417
 
 
1418
 
 
1419
class TestWeaveToKnit(KnitTests):
 
1420
 
 
1421
    def test_weave_to_knit_matches(self):
 
1422
        # check that the WeaveToKnit is_compatible function
 
1423
        # registers True for a Weave to a Knit.
 
1424
        w = Weave()
 
1425
        k = self.make_test_knit()
 
1426
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
1427
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
1428
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
1429
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
1430
 
 
1431
 
 
1432
class TestKnitCaching(KnitTests):
 
1433
    
 
1434
    def create_knit(self, cache_add=False):
 
1435
        k = self.make_test_knit(True)
 
1436
        if cache_add:
 
1437
            k.enable_cache()
 
1438
 
 
1439
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1440
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1441
        return k
 
1442
 
 
1443
    def test_no_caching(self):
 
1444
        k = self.create_knit()
 
1445
        # Nothing should be cached without setting 'enable_cache'
 
1446
        self.assertEqual({}, k._data._cache)
 
1447
 
 
1448
    def test_cache_add_and_clear(self):
 
1449
        k = self.create_knit(True)
 
1450
 
 
1451
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
1452
 
 
1453
        k.clear_cache()
 
1454
        self.assertEqual({}, k._data._cache)
 
1455
 
 
1456
    def test_cache_data_read_raw(self):
 
1457
        k = self.create_knit()
 
1458
 
 
1459
        # Now cache and read
 
1460
        k.enable_cache()
 
1461
 
 
1462
        def read_one_raw(version):
 
1463
            pos_map = k._get_components_positions([version])
 
1464
            method, pos, size, next = pos_map[version]
 
1465
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
1466
            self.assertEqual(1, len(lst))
 
1467
            return lst[0]
 
1468
 
 
1469
        val = read_one_raw('text-1')
 
1470
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
1471
 
 
1472
        k.clear_cache()
 
1473
        # After clear, new reads are not cached
 
1474
        self.assertEqual({}, k._data._cache)
 
1475
 
 
1476
        val2 = read_one_raw('text-1')
 
1477
        self.assertEqual(val, val2)
 
1478
        self.assertEqual({}, k._data._cache)
 
1479
 
 
1480
    def test_cache_data_read(self):
 
1481
        k = self.create_knit()
 
1482
 
 
1483
        def read_one(version):
 
1484
            pos_map = k._get_components_positions([version])
 
1485
            method, pos, size, next = pos_map[version]
 
1486
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
1487
            self.assertEqual(1, len(lst))
 
1488
            return lst[0]
 
1489
 
 
1490
        # Now cache and read
 
1491
        k.enable_cache()
 
1492
 
 
1493
        val = read_one('text-2')
 
1494
        self.assertEqual(['text-2'], k._data._cache.keys())
 
1495
        self.assertEqual('text-2', val[0])
 
1496
        content, digest = k._data._parse_record('text-2',
 
1497
                                                k._data._cache['text-2'])
 
1498
        self.assertEqual(content, val[1])
 
1499
        self.assertEqual(digest, val[2])
 
1500
 
 
1501
        k.clear_cache()
 
1502
        self.assertEqual({}, k._data._cache)
 
1503
 
 
1504
        val2 = read_one('text-2')
 
1505
        self.assertEqual(val, val2)
 
1506
        self.assertEqual({}, k._data._cache)
 
1507
 
 
1508
    def test_cache_read(self):
 
1509
        k = self.create_knit()
 
1510
        k.enable_cache()
 
1511
 
 
1512
        text = k.get_text('text-1')
 
1513
        self.assertEqual(TEXT_1, text)
 
1514
        self.assertEqual(['text-1'], k._data._cache.keys())
 
1515
 
 
1516
        k.clear_cache()
 
1517
        self.assertEqual({}, k._data._cache)
 
1518
 
 
1519
        text = k.get_text('text-1')
 
1520
        self.assertEqual(TEXT_1, text)
 
1521
        self.assertEqual({}, k._data._cache)
 
1522
 
 
1523
 
 
1524
class TestKnitIndex(KnitTests):
 
1525
 
 
1526
    def test_add_versions_dictionary_compresses(self):
 
1527
        """Adding versions to the index should update the lookup dict"""
 
1528
        knit = self.make_test_knit()
 
1529
        idx = knit._index
 
1530
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1531
        self.check_file_contents('test.kndx',
 
1532
            '# bzr knit index 8\n'
 
1533
            '\n'
 
1534
            'a-1 fulltext 0 0  :'
 
1535
            )
 
1536
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
1537
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
1538
                         ])
 
1539
        self.check_file_contents('test.kndx',
 
1540
            '# bzr knit index 8\n'
 
1541
            '\n'
 
1542
            'a-1 fulltext 0 0  :\n'
 
1543
            'a-2 fulltext 0 0 0 :\n'
 
1544
            'a-3 fulltext 0 0 1 :'
 
1545
            )
 
1546
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1547
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1548
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1549
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1550
                         }, idx._cache)
 
1551
 
 
1552
    def test_add_versions_fails_clean(self):
 
1553
        """If add_versions fails in the middle, it restores a pristine state.
 
1554
 
 
1555
        Any modifications that are made to the index are reset if all versions
 
1556
        cannot be added.
 
1557
        """
 
1558
        # This cheats a little bit by passing in a generator which will
 
1559
        # raise an exception before the processing finishes
 
1560
        # Other possibilities would be to have an version with the wrong number
 
1561
        # of entries, or to make the backing transport unable to write any
 
1562
        # files.
 
1563
 
 
1564
        knit = self.make_test_knit()
 
1565
        idx = knit._index
 
1566
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1567
 
 
1568
        class StopEarly(Exception):
 
1569
            pass
 
1570
 
 
1571
        def generate_failure():
 
1572
            """Add some entries and then raise an exception"""
 
1573
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
1574
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
1575
            raise StopEarly()
 
1576
 
 
1577
        # Assert the pre-condition
 
1578
        self.assertEqual(['a-1'], idx._history)
 
1579
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1580
 
 
1581
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
1582
 
 
1583
        # And it shouldn't be modified
 
1584
        self.assertEqual(['a-1'], idx._history)
 
1585
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1586
 
 
1587
    def test_knit_index_ignores_empty_files(self):
 
1588
        # There was a race condition in older bzr, where a ^C at the right time
 
1589
        # could leave an empty .kndx file, which bzr would later claim was a
 
1590
        # corrupted file since the header was not present. In reality, the file
 
1591
        # just wasn't created, so it should be ignored.
 
1592
        t = get_transport('.')
 
1593
        t.put_bytes('test.kndx', '')
 
1594
 
 
1595
        knit = self.make_test_knit()
 
1596
 
 
1597
    def test_knit_index_checks_header(self):
 
1598
        t = get_transport('.')
 
1599
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
1600
 
 
1601
        self.assertRaises(KnitHeaderError, self.make_test_knit)
 
1602
 
 
1603
 
 
1604
class TestGraphIndexKnit(KnitTests):
 
1605
    """Tests for knits using a GraphIndex rather than a KnitIndex."""
 
1606
 
 
1607
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
1608
        builder = GraphIndexBuilder(ref_lists)
 
1609
        for node, references, value in nodes:
 
1610
            builder.add_node(node, references, value)
 
1611
        stream = builder.finish()
 
1612
        trans = self.get_transport()
 
1613
        trans.put_file(name, stream)
 
1614
        return GraphIndex(trans, name)
 
1615
 
 
1616
    def two_graph_index(self, deltas=False, catch_adds=False):
 
1617
        """Build a two-graph index.
 
1618
 
 
1619
        :param deltas: If true, use underlying indices with two node-ref
 
1620
            lists and 'parent' set to a delta-compressed against tail.
 
1621
        """
 
1622
        # build a complex graph across several indices.
 
1623
        if deltas:
 
1624
            # delta compression inn the index
 
1625
            index1 = self.make_g_index('1', 2, [
 
1626
                (('tip', ), 'N0 100', ([('parent', )], [], )),
 
1627
                (('tail', ), '', ([], []))])
 
1628
            index2 = self.make_g_index('2', 2, [
 
1629
                (('parent', ), ' 100 78', ([('tail', ), ('ghost', )], [('tail', )])),
 
1630
                (('separate', ), '', ([], []))])
 
1631
        else:
 
1632
            # just blob location and graph in the index.
 
1633
            index1 = self.make_g_index('1', 1, [
 
1634
                (('tip', ), 'N0 100', ([('parent', )], )),
 
1635
                (('tail', ), '', ([], ))])
 
1636
            index2 = self.make_g_index('2', 1, [
 
1637
                (('parent', ), ' 100 78', ([('tail', ), ('ghost', )], )),
 
1638
                (('separate', ), '', ([], ))])
 
1639
        combined_index = CombinedGraphIndex([index1, index2])
 
1640
        if catch_adds:
 
1641
            self.combined_index = combined_index
 
1642
            self.caught_entries = []
 
1643
            add_callback = self.catch_add
 
1644
        else:
 
1645
            add_callback = None
 
1646
        return KnitGraphIndex(combined_index, deltas=deltas,
 
1647
            add_callback=add_callback)
 
1648
 
 
1649
    def test_get_graph(self):
 
1650
        index = self.two_graph_index()
 
1651
        self.assertEqual(set([
 
1652
            ('tip', ('parent', )),
 
1653
            ('tail', ()),
 
1654
            ('parent', ('tail', 'ghost')),
 
1655
            ('separate', ()),
 
1656
            ]), set(index.get_graph()))
 
1657
 
 
1658
    def test_get_ancestry(self):
 
1659
        # get_ancestry is defined as eliding ghosts, not erroring.
 
1660
        index = self.two_graph_index()
 
1661
        self.assertEqual([], index.get_ancestry([]))
 
1662
        self.assertEqual(['separate'], index.get_ancestry(['separate']))
 
1663
        self.assertEqual(['tail'], index.get_ancestry(['tail']))
 
1664
        self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
 
1665
        self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
 
1666
        self.assertTrue(index.get_ancestry(['tip', 'separate']) in
 
1667
            (['tail', 'parent', 'tip', 'separate'],
 
1668
             ['separate', 'tail', 'parent', 'tip'],
 
1669
            ))
 
1670
        # and without topo_sort
 
1671
        self.assertEqual(set(['separate']),
 
1672
            set(index.get_ancestry(['separate'], topo_sorted=False)))
 
1673
        self.assertEqual(set(['tail']),
 
1674
            set(index.get_ancestry(['tail'], topo_sorted=False)))
 
1675
        self.assertEqual(set(['tail', 'parent']),
 
1676
            set(index.get_ancestry(['parent'], topo_sorted=False)))
 
1677
        self.assertEqual(set(['tail', 'parent', 'tip']),
 
1678
            set(index.get_ancestry(['tip'], topo_sorted=False)))
 
1679
        self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
 
1680
            set(index.get_ancestry(['tip', 'separate'])))
 
1681
        # asking for a ghost makes it go boom.
 
1682
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
 
1683
 
 
1684
    def test_get_ancestry_with_ghosts(self):
 
1685
        index = self.two_graph_index()
 
1686
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
1687
        self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
 
1688
        self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
 
1689
        self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
 
1690
            (['tail', 'ghost', 'parent'],
 
1691
             ['ghost', 'tail', 'parent'],
 
1692
            ))
 
1693
        self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
 
1694
            (['tail', 'ghost', 'parent', 'tip'],
 
1695
             ['ghost', 'tail', 'parent', 'tip'],
 
1696
            ))
 
1697
        self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
 
1698
            (['tail', 'ghost', 'parent', 'tip', 'separate'],
 
1699
             ['ghost', 'tail', 'parent', 'tip', 'separate'],
 
1700
             ['separate', 'tail', 'ghost', 'parent', 'tip'],
 
1701
             ['separate', 'ghost', 'tail', 'parent', 'tip'],
 
1702
            ))
 
1703
        # asking for a ghost makes it go boom.
 
1704
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
 
1705
 
 
1706
    def test_num_versions(self):
 
1707
        index = self.two_graph_index()
 
1708
        self.assertEqual(4, index.num_versions())
 
1709
 
 
1710
    def test_get_versions(self):
 
1711
        index = self.two_graph_index()
 
1712
        self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
 
1713
            set(index.get_versions()))
 
1714
 
 
1715
    def test_has_version(self):
 
1716
        index = self.two_graph_index()
 
1717
        self.assertTrue(index.has_version('tail'))
 
1718
        self.assertFalse(index.has_version('ghost'))
 
1719
 
 
1720
    def test_get_position(self):
 
1721
        index = self.two_graph_index()
 
1722
        self.assertEqual((0, 100), index.get_position('tip'))
 
1723
        self.assertEqual((100, 78), index.get_position('parent'))
 
1724
 
 
1725
    def test_get_method_deltas(self):
 
1726
        index = self.two_graph_index(deltas=True)
 
1727
        self.assertEqual('fulltext', index.get_method('tip'))
 
1728
        self.assertEqual('line-delta', index.get_method('parent'))
 
1729
 
 
1730
    def test_get_method_no_deltas(self):
 
1731
        # check that the parent-history lookup is ignored with deltas=False.
 
1732
        index = self.two_graph_index(deltas=False)
 
1733
        self.assertEqual('fulltext', index.get_method('tip'))
 
1734
        self.assertEqual('fulltext', index.get_method('parent'))
 
1735
 
 
1736
    def test_get_options_deltas(self):
 
1737
        index = self.two_graph_index(deltas=True)
 
1738
        self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
 
1739
        self.assertEqual(['line-delta'], index.get_options('parent'))
 
1740
 
 
1741
    def test_get_options_no_deltas(self):
 
1742
        # check that the parent-history lookup is ignored with deltas=False.
 
1743
        index = self.two_graph_index(deltas=False)
 
1744
        self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
 
1745
        self.assertEqual(['fulltext'], index.get_options('parent'))
 
1746
 
 
1747
    def test_get_parents(self):
 
1748
        # get_parents ignores ghosts
 
1749
        index = self.two_graph_index()
 
1750
        self.assertEqual(('tail', ), index.get_parents('parent'))
 
1751
        # and errors on ghosts.
 
1752
        self.assertRaises(errors.RevisionNotPresent,
 
1753
            index.get_parents, 'ghost')
 
1754
 
 
1755
    def test_get_parents_with_ghosts(self):
 
1756
        index = self.two_graph_index()
 
1757
        self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
 
1758
        # and errors on ghosts.
 
1759
        self.assertRaises(errors.RevisionNotPresent,
 
1760
            index.get_parents_with_ghosts, 'ghost')
 
1761
 
 
1762
    def test_check_versions_present(self):
 
1763
        # ghosts should not be considered present
 
1764
        index = self.two_graph_index()
 
1765
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
1766
            ['ghost'])
 
1767
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
1768
            ['tail', 'ghost'])
 
1769
        index.check_versions_present(['tail', 'separate'])
 
1770
 
 
1771
    def catch_add(self, entries):
 
1772
        self.caught_entries.append(entries)
 
1773
 
 
1774
    def test_add_no_callback_errors(self):
 
1775
        index = self.two_graph_index()
 
1776
        self.assertRaises(errors.ReadOnlyError, index.add_version,
 
1777
            'new', 'fulltext,no-eol', 50, 60, ['separate'])
 
1778
 
 
1779
    def test_add_version_smoke(self):
 
1780
        index = self.two_graph_index(catch_adds=True)
 
1781
        index.add_version('new', 'fulltext,no-eol', 50, 60, ['separate'])
 
1782
        self.assertEqual([[(('new', ), 'N50 60', ((('separate',),),))]],
 
1783
            self.caught_entries)
 
1784
 
 
1785
    def test_add_version_delta_not_delta_index(self):
 
1786
        index = self.two_graph_index(catch_adds=True)
 
1787
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1788
            'new', 'no-eol,line-delta', 0, 100, ['parent'])
 
1789
        self.assertEqual([], self.caught_entries)
 
1790
 
 
1791
    def test_add_version_same_dup(self):
 
1792
        index = self.two_graph_index(catch_adds=True)
 
1793
        # options can be spelt two different ways
 
1794
        index.add_version('tip', 'fulltext,no-eol', 0, 100, ['parent'])
 
1795
        index.add_version('tip', 'no-eol,fulltext', 0, 100, ['parent'])
 
1796
        # but neither should have added data.
 
1797
        self.assertEqual([[], []], self.caught_entries)
 
1798
        
 
1799
    def test_add_version_different_dup(self):
 
1800
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
1801
        # change options
 
1802
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1803
            'tip', 'no-eol,line-delta', 0, 100, ['parent'])
 
1804
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1805
            'tip', 'line-delta,no-eol', 0, 100, ['parent'])
 
1806
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1807
            'tip', 'fulltext', 0, 100, ['parent'])
 
1808
        # position/length
 
1809
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1810
            'tip', 'fulltext,no-eol', 50, 100, ['parent'])
 
1811
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1812
            'tip', 'fulltext,no-eol', 0, 1000, ['parent'])
 
1813
        # parents
 
1814
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
1815
            'tip', 'fulltext,no-eol', 0, 100, [])
 
1816
        self.assertEqual([], self.caught_entries)
 
1817
        
 
1818
    def test_add_versions_nodeltas(self):
 
1819
        index = self.two_graph_index(catch_adds=True)
 
1820
        index.add_versions([
 
1821
                ('new', 'fulltext,no-eol', 50, 60, ['separate']),
 
1822
                ('new2', 'fulltext', 0, 6, ['new']),
 
1823
                ])
 
1824
        self.assertEqual([(('new', ), 'N50 60', ((('separate',),),)),
 
1825
            (('new2', ), ' 0 6', ((('new',),),))],
 
1826
            sorted(self.caught_entries[0]))
 
1827
        self.assertEqual(1, len(self.caught_entries))
 
1828
 
 
1829
    def test_add_versions_deltas(self):
 
1830
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
1831
        index.add_versions([
 
1832
                ('new', 'fulltext,no-eol', 50, 60, ['separate']),
 
1833
                ('new2', 'line-delta', 0, 6, ['new']),
 
1834
                ])
 
1835
        self.assertEqual([(('new', ), 'N50 60', ((('separate',),), ())),
 
1836
            (('new2', ), ' 0 6', ((('new',),), (('new',),), ))],
 
1837
            sorted(self.caught_entries[0]))
 
1838
        self.assertEqual(1, len(self.caught_entries))
 
1839
 
 
1840
    def test_add_versions_delta_not_delta_index(self):
 
1841
        index = self.two_graph_index(catch_adds=True)
 
1842
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1843
            [('new', 'no-eol,line-delta', 0, 100, ['parent'])])
 
1844
        self.assertEqual([], self.caught_entries)
 
1845
 
 
1846
    def test_add_versions_same_dup(self):
 
1847
        index = self.two_graph_index(catch_adds=True)
 
1848
        # options can be spelt two different ways
 
1849
        index.add_versions([('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
 
1850
        index.add_versions([('tip', 'no-eol,fulltext', 0, 100, ['parent'])])
 
1851
        # but neither should have added data.
 
1852
        self.assertEqual([[], []], self.caught_entries)
 
1853
        
 
1854
    def test_add_versions_different_dup(self):
 
1855
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
1856
        # change options
 
1857
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1858
            [('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
 
1859
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1860
            [('tip', 'line-delta,no-eol', 0, 100, ['parent'])])
 
1861
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1862
            [('tip', 'fulltext', 0, 100, ['parent'])])
 
1863
        # position/length
 
1864
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1865
            [('tip', 'fulltext,no-eol', 50, 100, ['parent'])])
 
1866
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1867
            [('tip', 'fulltext,no-eol', 0, 1000, ['parent'])])
 
1868
        # parents
 
1869
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1870
            [('tip', 'fulltext,no-eol', 0, 100, [])])
 
1871
        # change options in the second record
 
1872
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
1873
            [('tip', 'fulltext,no-eol', 0, 100, ['parent']),
 
1874
             ('tip', 'no-eol,line-delta', 0, 100, ['parent'])])
 
1875
        self.assertEqual([], self.caught_entries)
 
1876
 
 
1877
    def test_iter_parents(self):
 
1878
        index1 = self.make_g_index('1', 1, [
 
1879
        # no parents
 
1880
            (('r0', ), 'N0 100', ([], )),
 
1881
        # 1 parent
 
1882
            (('r1', ), '', ([('r0', )], ))])
 
1883
        index2 = self.make_g_index('2', 1, [
 
1884
        # 2 parents
 
1885
            (('r2', ), 'N0 100', ([('r1', ), ('r0', )], )),
 
1886
            ])
 
1887
        combined_index = CombinedGraphIndex([index1, index2])
 
1888
        index = KnitGraphIndex(combined_index)
 
1889
        # XXX TODO a ghost
 
1890
        # cases: each sample data individually:
 
1891
        self.assertEqual(set([('r0', ())]),
 
1892
            set(index.iter_parents(['r0'])))
 
1893
        self.assertEqual(set([('r1', ('r0', ))]),
 
1894
            set(index.iter_parents(['r1'])))
 
1895
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
1896
            set(index.iter_parents(['r2'])))
 
1897
        # no nodes returned for a missing node
 
1898
        self.assertEqual(set(),
 
1899
            set(index.iter_parents(['missing'])))
 
1900
        # 1 node returned with missing nodes skipped
 
1901
        self.assertEqual(set([('r1', ('r0', ))]),
 
1902
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
 
1903
        # 2 nodes returned
 
1904
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
1905
            set(index.iter_parents(['r0', 'r1'])))
 
1906
        # 2 nodes returned, missing skipped
 
1907
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
1908
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
1909
 
 
1910
 
 
1911
class TestNoParentsGraphIndexKnit(KnitTests):
 
1912
    """Tests for knits using KnitGraphIndex with no parents."""
 
1913
 
 
1914
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
1915
        builder = GraphIndexBuilder(ref_lists)
 
1916
        for node, references in nodes:
 
1917
            builder.add_node(node, references)
 
1918
        stream = builder.finish()
 
1919
        trans = self.get_transport()
 
1920
        trans.put_file(name, stream)
 
1921
        return GraphIndex(trans, name)
 
1922
 
 
1923
    def test_parents_deltas_incompatible(self):
 
1924
        index = CombinedGraphIndex([])
 
1925
        self.assertRaises(errors.KnitError, KnitGraphIndex, index,
 
1926
            deltas=True, parents=False)
 
1927
 
 
1928
    def two_graph_index(self, catch_adds=False):
 
1929
        """Build a two-graph index.
 
1930
 
 
1931
        :param deltas: If true, use underlying indices with two node-ref
 
1932
            lists and 'parent' set to a delta-compressed against tail.
 
1933
        """
 
1934
        # put several versions in the index.
 
1935
        index1 = self.make_g_index('1', 0, [
 
1936
            (('tip', ), 'N0 100'),
 
1937
            (('tail', ), '')])
 
1938
        index2 = self.make_g_index('2', 0, [
 
1939
            (('parent', ), ' 100 78'),
 
1940
            (('separate', ), '')])
 
1941
        combined_index = CombinedGraphIndex([index1, index2])
 
1942
        if catch_adds:
 
1943
            self.combined_index = combined_index
 
1944
            self.caught_entries = []
 
1945
            add_callback = self.catch_add
 
1946
        else:
 
1947
            add_callback = None
 
1948
        return KnitGraphIndex(combined_index, parents=False,
 
1949
            add_callback=add_callback)
 
1950
 
 
1951
    def test_get_graph(self):
 
1952
        index = self.two_graph_index()
 
1953
        self.assertEqual(set([
 
1954
            ('tip', ()),
 
1955
            ('tail', ()),
 
1956
            ('parent', ()),
 
1957
            ('separate', ()),
 
1958
            ]), set(index.get_graph()))
 
1959
 
 
1960
    def test_get_ancestry(self):
 
1961
        # with no parents, ancestry is always just the key.
 
1962
        index = self.two_graph_index()
 
1963
        self.assertEqual([], index.get_ancestry([]))
 
1964
        self.assertEqual(['separate'], index.get_ancestry(['separate']))
 
1965
        self.assertEqual(['tail'], index.get_ancestry(['tail']))
 
1966
        self.assertEqual(['parent'], index.get_ancestry(['parent']))
 
1967
        self.assertEqual(['tip'], index.get_ancestry(['tip']))
 
1968
        self.assertTrue(index.get_ancestry(['tip', 'separate']) in
 
1969
            (['tip', 'separate'],
 
1970
             ['separate', 'tip'],
 
1971
            ))
 
1972
        # asking for a ghost makes it go boom.
 
1973
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
 
1974
 
 
1975
    def test_get_ancestry_with_ghosts(self):
 
1976
        index = self.two_graph_index()
 
1977
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
1978
        self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
 
1979
        self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
 
1980
        self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
 
1981
        self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
 
1982
        self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
 
1983
            (['tip', 'separate'],
 
1984
             ['separate', 'tip'],
 
1985
            ))
 
1986
        # asking for a ghost makes it go boom.
 
1987
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
 
1988
 
 
1989
    def test_num_versions(self):
 
1990
        index = self.two_graph_index()
 
1991
        self.assertEqual(4, index.num_versions())
 
1992
 
 
1993
    def test_get_versions(self):
 
1994
        index = self.two_graph_index()
 
1995
        self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
 
1996
            set(index.get_versions()))
 
1997
 
 
1998
    def test_has_version(self):
 
1999
        index = self.two_graph_index()
 
2000
        self.assertTrue(index.has_version('tail'))
 
2001
        self.assertFalse(index.has_version('ghost'))
 
2002
 
 
2003
    def test_get_position(self):
 
2004
        index = self.two_graph_index()
 
2005
        self.assertEqual((0, 100), index.get_position('tip'))
 
2006
        self.assertEqual((100, 78), index.get_position('parent'))
 
2007
 
 
2008
    def test_get_method(self):
 
2009
        index = self.two_graph_index()
 
2010
        self.assertEqual('fulltext', index.get_method('tip'))
 
2011
        self.assertEqual(['fulltext'], index.get_options('parent'))
 
2012
 
 
2013
    def test_get_options(self):
 
2014
        index = self.two_graph_index()
 
2015
        self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
 
2016
        self.assertEqual(['fulltext'], index.get_options('parent'))
 
2017
 
 
2018
    def test_get_parents(self):
 
2019
        index = self.two_graph_index()
 
2020
        self.assertEqual((), index.get_parents('parent'))
 
2021
        # and errors on ghosts.
 
2022
        self.assertRaises(errors.RevisionNotPresent,
 
2023
            index.get_parents, 'ghost')
 
2024
 
 
2025
    def test_get_parents_with_ghosts(self):
 
2026
        index = self.two_graph_index()
 
2027
        self.assertEqual((), index.get_parents_with_ghosts('parent'))
 
2028
        # and errors on ghosts.
 
2029
        self.assertRaises(errors.RevisionNotPresent,
 
2030
            index.get_parents_with_ghosts, 'ghost')
 
2031
 
 
2032
    def test_check_versions_present(self):
 
2033
        index = self.two_graph_index()
 
2034
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
2035
            ['missing'])
 
2036
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
2037
            ['tail', 'missing'])
 
2038
        index.check_versions_present(['tail', 'separate'])
 
2039
 
 
2040
    def catch_add(self, entries):
 
2041
        self.caught_entries.append(entries)
 
2042
 
 
2043
    def test_add_no_callback_errors(self):
 
2044
        index = self.two_graph_index()
 
2045
        self.assertRaises(errors.ReadOnlyError, index.add_version,
 
2046
            'new', 'fulltext,no-eol', 50, 60, ['separate'])
 
2047
 
 
2048
    def test_add_version_smoke(self):
 
2049
        index = self.two_graph_index(catch_adds=True)
 
2050
        index.add_version('new', 'fulltext,no-eol', 50, 60, [])
 
2051
        self.assertEqual([[(('new', ), 'N50 60')]],
 
2052
            self.caught_entries)
 
2053
 
 
2054
    def test_add_version_delta_not_delta_index(self):
 
2055
        index = self.two_graph_index(catch_adds=True)
 
2056
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2057
            'new', 'no-eol,line-delta', 0, 100, [])
 
2058
        self.assertEqual([], self.caught_entries)
 
2059
 
 
2060
    def test_add_version_same_dup(self):
 
2061
        index = self.two_graph_index(catch_adds=True)
 
2062
        # options can be spelt two different ways
 
2063
        index.add_version('tip', 'fulltext,no-eol', 0, 100, [])
 
2064
        index.add_version('tip', 'no-eol,fulltext', 0, 100, [])
 
2065
        # but neither should have added data.
 
2066
        self.assertEqual([[], []], self.caught_entries)
 
2067
        
 
2068
    def test_add_version_different_dup(self):
 
2069
        index = self.two_graph_index(catch_adds=True)
 
2070
        # change options
 
2071
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2072
            'tip', 'no-eol,line-delta', 0, 100, [])
 
2073
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2074
            'tip', 'line-delta,no-eol', 0, 100, [])
 
2075
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2076
            'tip', 'fulltext', 0, 100, [])
 
2077
        # position/length
 
2078
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2079
            'tip', 'fulltext,no-eol', 50, 100, [])
 
2080
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2081
            'tip', 'fulltext,no-eol', 0, 1000, [])
 
2082
        # parents
 
2083
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2084
            'tip', 'fulltext,no-eol', 0, 100, ['parent'])
 
2085
        self.assertEqual([], self.caught_entries)
 
2086
        
 
2087
    def test_add_versions(self):
 
2088
        index = self.two_graph_index(catch_adds=True)
 
2089
        index.add_versions([
 
2090
                ('new', 'fulltext,no-eol', 50, 60, []),
 
2091
                ('new2', 'fulltext', 0, 6, []),
 
2092
                ])
 
2093
        self.assertEqual([(('new', ), 'N50 60'), (('new2', ), ' 0 6')],
 
2094
            sorted(self.caught_entries[0]))
 
2095
        self.assertEqual(1, len(self.caught_entries))
 
2096
 
 
2097
    def test_add_versions_delta_not_delta_index(self):
 
2098
        index = self.two_graph_index(catch_adds=True)
 
2099
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2100
            [('new', 'no-eol,line-delta', 0, 100, ['parent'])])
 
2101
        self.assertEqual([], self.caught_entries)
 
2102
 
 
2103
    def test_add_versions_parents_not_parents_index(self):
 
2104
        index = self.two_graph_index(catch_adds=True)
 
2105
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2106
            [('new', 'no-eol,fulltext', 0, 100, ['parent'])])
 
2107
        self.assertEqual([], self.caught_entries)
 
2108
 
 
2109
    def test_add_versions_same_dup(self):
 
2110
        index = self.two_graph_index(catch_adds=True)
 
2111
        # options can be spelt two different ways
 
2112
        index.add_versions([('tip', 'fulltext,no-eol', 0, 100, [])])
 
2113
        index.add_versions([('tip', 'no-eol,fulltext', 0, 100, [])])
 
2114
        # but neither should have added data.
 
2115
        self.assertEqual([[], []], self.caught_entries)
 
2116
        
 
2117
    def test_add_versions_different_dup(self):
 
2118
        index = self.two_graph_index(catch_adds=True)
 
2119
        # change options
 
2120
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2121
            [('tip', 'no-eol,line-delta', 0, 100, [])])
 
2122
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2123
            [('tip', 'line-delta,no-eol', 0, 100, [])])
 
2124
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2125
            [('tip', 'fulltext', 0, 100, [])])
 
2126
        # position/length
 
2127
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2128
            [('tip', 'fulltext,no-eol', 50, 100, [])])
 
2129
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2130
            [('tip', 'fulltext,no-eol', 0, 1000, [])])
 
2131
        # parents
 
2132
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2133
            [('tip', 'fulltext,no-eol', 0, 100, ['parent'])])
 
2134
        # change options in the second record
 
2135
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2136
            [('tip', 'fulltext,no-eol', 0, 100, []),
 
2137
             ('tip', 'no-eol,line-delta', 0, 100, [])])
 
2138
        self.assertEqual([], self.caught_entries)
 
2139
 
 
2140
    def test_iter_parents(self):
 
2141
        index = self.two_graph_index()
 
2142
        self.assertEqual(set([
 
2143
            ('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
 
2144
            ]),
 
2145
            set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
 
2146
        self.assertEqual(set([('tip', ())]),
 
2147
            set(index.iter_parents(['tip'])))
 
2148
        self.assertEqual(set(),
 
2149
            set(index.iter_parents([])))