~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Robert Collins
  • Date: 2007-07-04 08:08:13 UTC
  • mfrom: (2572 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2587.
  • Revision ID: robertc@robertcollins.net-20070704080813-wzebx0r88fvwj5rq
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for Knit data structure"""
18
18
 
19
 
 
 
19
from cStringIO import StringIO
20
20
import difflib
21
 
 
22
 
 
23
 
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
 
21
import gzip
 
22
import sha
 
23
 
 
24
from bzrlib import (
 
25
    errors,
 
26
    )
 
27
from bzrlib.errors import (
 
28
    RevisionAlreadyPresent,
 
29
    KnitHeaderError,
 
30
    RevisionNotPresent,
 
31
    NoSuchFile,
 
32
    )
24
33
from bzrlib.knit import (
 
34
    KnitContent,
25
35
    KnitVersionedFile,
26
36
    KnitPlainFactory,
27
37
    KnitAnnotateFactory,
28
 
    WeaveToKnit)
 
38
    _KnitData,
 
39
    _KnitIndex,
 
40
    WeaveToKnit,
 
41
    )
29
42
from bzrlib.osutils import split_lines
30
 
from bzrlib.tests import TestCaseWithTransport
 
43
from bzrlib.tests import TestCase, TestCaseWithTransport
31
44
from bzrlib.transport import TransportLogger, get_transport
32
45
from bzrlib.transport.memory import MemoryTransport
33
46
from bzrlib.weave import Weave
34
47
 
35
48
 
 
49
class KnitContentTests(TestCase):
 
50
 
 
51
    def test_constructor(self):
 
52
        content = KnitContent([])
 
53
 
 
54
    def test_text(self):
 
55
        content = KnitContent([])
 
56
        self.assertEqual(content.text(), [])
 
57
 
 
58
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
59
        self.assertEqual(content.text(), ["text1", "text2"])
 
60
 
 
61
    def test_annotate(self):
 
62
        content = KnitContent([])
 
63
        self.assertEqual(content.annotate(), [])
 
64
 
 
65
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
66
        self.assertEqual(content.annotate(),
 
67
            [("origin1", "text1"), ("origin2", "text2")])
 
68
 
 
69
    def test_annotate_iter(self):
 
70
        content = KnitContent([])
 
71
        it = content.annotate_iter()
 
72
        self.assertRaises(StopIteration, it.next)
 
73
 
 
74
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
75
        it = content.annotate_iter()
 
76
        self.assertEqual(it.next(), ("origin1", "text1"))
 
77
        self.assertEqual(it.next(), ("origin2", "text2"))
 
78
        self.assertRaises(StopIteration, it.next)
 
79
 
 
80
    def test_copy(self):
 
81
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
82
        copy = content.copy()
 
83
        self.assertIsInstance(copy, KnitContent)
 
84
        self.assertEqual(copy.annotate(),
 
85
            [("origin1", "text1"), ("origin2", "text2")])
 
86
 
 
87
    def test_line_delta(self):
 
88
        content1 = KnitContent([("", "a"), ("", "b")])
 
89
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
90
        self.assertEqual(content1.line_delta(content2),
 
91
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
92
 
 
93
    def test_line_delta_iter(self):
 
94
        content1 = KnitContent([("", "a"), ("", "b")])
 
95
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
96
        it = content1.line_delta_iter(content2)
 
97
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
98
        self.assertRaises(StopIteration, it.next)
 
99
 
 
100
 
 
101
class MockTransport(object):
 
102
 
 
103
    def __init__(self, file_lines=None):
 
104
        self.file_lines = file_lines
 
105
        self.calls = []
 
106
        # We have no base directory for the MockTransport
 
107
        self.base = ''
 
108
 
 
109
    def get(self, filename):
 
110
        if self.file_lines is None:
 
111
            raise NoSuchFile(filename)
 
112
        else:
 
113
            return StringIO("\n".join(self.file_lines))
 
114
 
 
115
    def readv(self, relpath, offsets):
 
116
        fp = self.get(relpath)
 
117
        for offset, size in offsets:
 
118
            fp.seek(offset)
 
119
            yield offset, fp.read(size)
 
120
 
 
121
    def __getattr__(self, name):
 
122
        def queue_call(*args, **kwargs):
 
123
            self.calls.append((name, args, kwargs))
 
124
        return queue_call
 
125
 
 
126
 
 
127
class LowLevelKnitDataTests(TestCase):
 
128
 
 
129
    def create_gz_content(self, text):
 
130
        sio = StringIO()
 
131
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
132
        gz_file.write(text)
 
133
        gz_file.close()
 
134
        return sio.getvalue()
 
135
 
 
136
    def test_valid_knit_data(self):
 
137
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
138
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
139
                                        'foo\n'
 
140
                                        'bar\n'
 
141
                                        'end rev-id-1\n'
 
142
                                        % (sha1sum,))
 
143
        transport = MockTransport([gz_txt])
 
144
        data = _KnitData(transport, 'filename', mode='r')
 
145
        records = [('rev-id-1', 0, len(gz_txt))]
 
146
 
 
147
        contents = data.read_records(records)
 
148
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
149
 
 
150
        raw_contents = list(data.read_records_iter_raw(records))
 
151
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
152
 
 
153
    def test_not_enough_lines(self):
 
154
        sha1sum = sha.new('foo\n').hexdigest()
 
155
        # record says 2 lines data says 1
 
156
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
157
                                        'foo\n'
 
158
                                        'end rev-id-1\n'
 
159
                                        % (sha1sum,))
 
160
        transport = MockTransport([gz_txt])
 
161
        data = _KnitData(transport, 'filename', mode='r')
 
162
        records = [('rev-id-1', 0, len(gz_txt))]
 
163
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
164
 
 
165
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
166
        raw_contents = list(data.read_records_iter_raw(records))
 
167
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
168
 
 
169
    def test_too_many_lines(self):
 
170
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
171
        # record says 1 lines data says 2
 
172
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
173
                                        'foo\n'
 
174
                                        'bar\n'
 
175
                                        'end rev-id-1\n'
 
176
                                        % (sha1sum,))
 
177
        transport = MockTransport([gz_txt])
 
178
        data = _KnitData(transport, 'filename', mode='r')
 
179
        records = [('rev-id-1', 0, len(gz_txt))]
 
180
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
181
 
 
182
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
183
        raw_contents = list(data.read_records_iter_raw(records))
 
184
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
185
 
 
186
    def test_mismatched_version_id(self):
 
187
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
188
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
189
                                        'foo\n'
 
190
                                        'bar\n'
 
191
                                        'end rev-id-1\n'
 
192
                                        % (sha1sum,))
 
193
        transport = MockTransport([gz_txt])
 
194
        data = _KnitData(transport, 'filename', mode='r')
 
195
        # We are asking for rev-id-2, but the data is rev-id-1
 
196
        records = [('rev-id-2', 0, len(gz_txt))]
 
197
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
198
 
 
199
        # read_records_iter_raw will notice if we request the wrong version.
 
200
        self.assertRaises(errors.KnitCorrupt, list,
 
201
                          data.read_records_iter_raw(records))
 
202
 
 
203
    def test_uncompressed_data(self):
 
204
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
205
        txt = ('version rev-id-1 2 %s\n'
 
206
               'foo\n'
 
207
               'bar\n'
 
208
               'end rev-id-1\n'
 
209
               % (sha1sum,))
 
210
        transport = MockTransport([txt])
 
211
        data = _KnitData(transport, 'filename', mode='r')
 
212
        records = [('rev-id-1', 0, len(txt))]
 
213
 
 
214
        # We don't have valid gzip data ==> corrupt
 
215
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
216
 
 
217
        # read_records_iter_raw will notice the bad data
 
218
        self.assertRaises(errors.KnitCorrupt, list,
 
219
                          data.read_records_iter_raw(records))
 
220
 
 
221
    def test_corrupted_data(self):
 
222
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
223
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
224
                                        'foo\n'
 
225
                                        'bar\n'
 
226
                                        'end rev-id-1\n'
 
227
                                        % (sha1sum,))
 
228
        # Change 2 bytes in the middle to \xff
 
229
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
230
        transport = MockTransport([gz_txt])
 
231
        data = _KnitData(transport, 'filename', mode='r')
 
232
        records = [('rev-id-1', 0, len(gz_txt))]
 
233
 
 
234
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
235
 
 
236
        # read_records_iter_raw will notice if we request the wrong version.
 
237
        self.assertRaises(errors.KnitCorrupt, list,
 
238
                          data.read_records_iter_raw(records))
 
239
 
 
240
 
 
241
class LowLevelKnitIndexTests(TestCase):
 
242
 
 
243
    def test_no_such_file(self):
 
244
        transport = MockTransport()
 
245
 
 
246
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
 
247
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
 
248
            "filename", "w", create=False)
 
249
 
 
250
    def test_create_file(self):
 
251
        transport = MockTransport()
 
252
 
 
253
        index = _KnitIndex(transport, "filename", "w",
 
254
            file_mode="wb", create=True)
 
255
        self.assertEqual(
 
256
                ("put_bytes_non_atomic",
 
257
                    ("filename", index.HEADER), {"mode": "wb"}),
 
258
                transport.calls.pop(0))
 
259
 
 
260
    def test_delay_create_file(self):
 
261
        transport = MockTransport()
 
262
 
 
263
        index = _KnitIndex(transport, "filename", "w",
 
264
            create=True, file_mode="wb", create_parent_dir=True,
 
265
            delay_create=True, dir_mode=0777)
 
266
        self.assertEqual([], transport.calls)
 
267
 
 
268
        index.add_versions([])
 
269
        name, (filename, f), kwargs = transport.calls.pop(0)
 
270
        self.assertEqual("put_file_non_atomic", name)
 
271
        self.assertEqual(
 
272
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
273
            kwargs)
 
274
        self.assertEqual("filename", filename)
 
275
        self.assertEqual(index.HEADER, f.read())
 
276
 
 
277
        index.add_versions([])
 
278
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
279
            transport.calls.pop(0))
 
280
 
 
281
    def test_read_utf8_version_id(self):
 
282
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
283
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
284
        transport = MockTransport([
 
285
            _KnitIndex.HEADER,
 
286
            '%s option 0 1 :' % (utf8_revision_id,)
 
287
            ])
 
288
        index = _KnitIndex(transport, "filename", "r")
 
289
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
290
        # Unicode revision_ids.
 
291
        self.assertTrue(index.has_version(utf8_revision_id))
 
292
        self.assertFalse(index.has_version(unicode_revision_id))
 
293
 
 
294
    def test_read_utf8_parents(self):
 
295
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
296
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
297
        transport = MockTransport([
 
298
            _KnitIndex.HEADER,
 
299
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
300
            ])
 
301
        index = _KnitIndex(transport, "filename", "r")
 
302
        self.assertEqual([utf8_revision_id],
 
303
            index.get_parents_with_ghosts("version"))
 
304
 
 
305
    def test_read_ignore_corrupted_lines(self):
 
306
        transport = MockTransport([
 
307
            _KnitIndex.HEADER,
 
308
            "corrupted",
 
309
            "corrupted options 0 1 .b .c ",
 
310
            "version options 0 1 :"
 
311
            ])
 
312
        index = _KnitIndex(transport, "filename", "r")
 
313
        self.assertEqual(1, index.num_versions())
 
314
        self.assertTrue(index.has_version("version"))
 
315
 
 
316
    def test_read_corrupted_header(self):
 
317
        transport = MockTransport(['not a bzr knit index header\n'])
 
318
        self.assertRaises(KnitHeaderError,
 
319
            _KnitIndex, transport, "filename", "r")
 
320
 
 
321
    def test_read_duplicate_entries(self):
 
322
        transport = MockTransport([
 
323
            _KnitIndex.HEADER,
 
324
            "parent options 0 1 :",
 
325
            "version options1 0 1 0 :",
 
326
            "version options2 1 2 .other :",
 
327
            "version options3 3 4 0 .other :"
 
328
            ])
 
329
        index = _KnitIndex(transport, "filename", "r")
 
330
        self.assertEqual(2, index.num_versions())
 
331
        self.assertEqual(1, index.lookup("version"))
 
332
        self.assertEqual((3, 4), index.get_position("version"))
 
333
        self.assertEqual(["options3"], index.get_options("version"))
 
334
        self.assertEqual(["parent", "other"],
 
335
            index.get_parents_with_ghosts("version"))
 
336
 
 
337
    def test_read_compressed_parents(self):
 
338
        transport = MockTransport([
 
339
            _KnitIndex.HEADER,
 
340
            "a option 0 1 :",
 
341
            "b option 0 1 0 :",
 
342
            "c option 0 1 1 0 :",
 
343
            ])
 
344
        index = _KnitIndex(transport, "filename", "r")
 
345
        self.assertEqual(["a"], index.get_parents("b"))
 
346
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
347
 
 
348
    def test_write_utf8_version_id(self):
 
349
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
350
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
351
        transport = MockTransport([
 
352
            _KnitIndex.HEADER
 
353
            ])
 
354
        index = _KnitIndex(transport, "filename", "r")
 
355
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
 
356
        self.assertEqual(("append_bytes", ("filename",
 
357
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
358
            {}),
 
359
            transport.calls.pop(0))
 
360
 
 
361
    def test_write_utf8_parents(self):
 
362
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
363
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
364
        transport = MockTransport([
 
365
            _KnitIndex.HEADER
 
366
            ])
 
367
        index = _KnitIndex(transport, "filename", "r")
 
368
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
 
369
        self.assertEqual(("append_bytes", ("filename",
 
370
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
371
            {}),
 
372
            transport.calls.pop(0))
 
373
 
 
374
    def test_get_graph(self):
 
375
        transport = MockTransport()
 
376
        index = _KnitIndex(transport, "filename", "w", create=True)
 
377
        self.assertEqual([], index.get_graph())
 
378
 
 
379
        index.add_version("a", ["option"], 0, 1, ["b"])
 
380
        self.assertEqual([("a", ["b"])], index.get_graph())
 
381
 
 
382
        index.add_version("c", ["option"], 0, 1, ["d"])
 
383
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
384
            sorted(index.get_graph()))
 
385
 
 
386
    def test_get_ancestry(self):
 
387
        transport = MockTransport([
 
388
            _KnitIndex.HEADER,
 
389
            "a option 0 1 :",
 
390
            "b option 0 1 0 .e :",
 
391
            "c option 0 1 1 0 :",
 
392
            "d option 0 1 2 .f :"
 
393
            ])
 
394
        index = _KnitIndex(transport, "filename", "r")
 
395
 
 
396
        self.assertEqual([], index.get_ancestry([]))
 
397
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
398
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
399
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
400
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
401
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
402
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
403
 
 
404
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
405
 
 
406
    def test_get_ancestry_with_ghosts(self):
 
407
        transport = MockTransport([
 
408
            _KnitIndex.HEADER,
 
409
            "a option 0 1 :",
 
410
            "b option 0 1 0 .e :",
 
411
            "c option 0 1 0 .f .g :",
 
412
            "d option 0 1 2 .h .j .k :"
 
413
            ])
 
414
        index = _KnitIndex(transport, "filename", "r")
 
415
 
 
416
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
417
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
418
        self.assertEqual(["a", "e", "b"],
 
419
            index.get_ancestry_with_ghosts(["b"]))
 
420
        self.assertEqual(["a", "g", "f", "c"],
 
421
            index.get_ancestry_with_ghosts(["c"]))
 
422
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
423
            index.get_ancestry_with_ghosts(["d"]))
 
424
        self.assertEqual(["a", "e", "b"],
 
425
            index.get_ancestry_with_ghosts(["a", "b"]))
 
426
        self.assertEqual(["a", "g", "f", "c"],
 
427
            index.get_ancestry_with_ghosts(["a", "c"]))
 
428
        self.assertEqual(
 
429
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
430
            index.get_ancestry_with_ghosts(["b", "d"]))
 
431
 
 
432
        self.assertRaises(RevisionNotPresent,
 
433
            index.get_ancestry_with_ghosts, ["e"])
 
434
 
 
435
    def test_num_versions(self):
 
436
        transport = MockTransport([
 
437
            _KnitIndex.HEADER
 
438
            ])
 
439
        index = _KnitIndex(transport, "filename", "r")
 
440
 
 
441
        self.assertEqual(0, index.num_versions())
 
442
        self.assertEqual(0, len(index))
 
443
 
 
444
        index.add_version("a", ["option"], 0, 1, [])
 
445
        self.assertEqual(1, index.num_versions())
 
446
        self.assertEqual(1, len(index))
 
447
 
 
448
        index.add_version("a", ["option2"], 1, 2, [])
 
449
        self.assertEqual(1, index.num_versions())
 
450
        self.assertEqual(1, len(index))
 
451
 
 
452
        index.add_version("b", ["option"], 0, 1, [])
 
453
        self.assertEqual(2, index.num_versions())
 
454
        self.assertEqual(2, len(index))
 
455
 
 
456
    def test_get_versions(self):
 
457
        transport = MockTransport([
 
458
            _KnitIndex.HEADER
 
459
            ])
 
460
        index = _KnitIndex(transport, "filename", "r")
 
461
 
 
462
        self.assertEqual([], index.get_versions())
 
463
 
 
464
        index.add_version("a", ["option"], 0, 1, [])
 
465
        self.assertEqual(["a"], index.get_versions())
 
466
 
 
467
        index.add_version("a", ["option"], 0, 1, [])
 
468
        self.assertEqual(["a"], index.get_versions())
 
469
 
 
470
        index.add_version("b", ["option"], 0, 1, [])
 
471
        self.assertEqual(["a", "b"], index.get_versions())
 
472
 
 
473
    def test_idx_to_name(self):
 
474
        transport = MockTransport([
 
475
            _KnitIndex.HEADER,
 
476
            "a option 0 1 :",
 
477
            "b option 0 1 :"
 
478
            ])
 
479
        index = _KnitIndex(transport, "filename", "r")
 
480
 
 
481
        self.assertEqual("a", index.idx_to_name(0))
 
482
        self.assertEqual("b", index.idx_to_name(1))
 
483
        self.assertEqual("b", index.idx_to_name(-1))
 
484
        self.assertEqual("a", index.idx_to_name(-2))
 
485
 
 
486
    def test_lookup(self):
 
487
        transport = MockTransport([
 
488
            _KnitIndex.HEADER,
 
489
            "a option 0 1 :",
 
490
            "b option 0 1 :"
 
491
            ])
 
492
        index = _KnitIndex(transport, "filename", "r")
 
493
 
 
494
        self.assertEqual(0, index.lookup("a"))
 
495
        self.assertEqual(1, index.lookup("b"))
 
496
 
 
497
    def test_add_version(self):
 
498
        transport = MockTransport([
 
499
            _KnitIndex.HEADER
 
500
            ])
 
501
        index = _KnitIndex(transport, "filename", "r")
 
502
 
 
503
        index.add_version("a", ["option"], 0, 1, ["b"])
 
504
        self.assertEqual(("append_bytes",
 
505
            ("filename", "\na option 0 1 .b :"),
 
506
            {}), transport.calls.pop(0))
 
507
        self.assertTrue(index.has_version("a"))
 
508
        self.assertEqual(1, index.num_versions())
 
509
        self.assertEqual((0, 1), index.get_position("a"))
 
510
        self.assertEqual(["option"], index.get_options("a"))
 
511
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
512
 
 
513
        index.add_version("a", ["opt"], 1, 2, ["c"])
 
514
        self.assertEqual(("append_bytes",
 
515
            ("filename", "\na opt 1 2 .c :"),
 
516
            {}), transport.calls.pop(0))
 
517
        self.assertTrue(index.has_version("a"))
 
518
        self.assertEqual(1, index.num_versions())
 
519
        self.assertEqual((1, 2), index.get_position("a"))
 
520
        self.assertEqual(["opt"], index.get_options("a"))
 
521
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
522
 
 
523
        index.add_version("b", ["option"], 2, 3, ["a"])
 
524
        self.assertEqual(("append_bytes",
 
525
            ("filename", "\nb option 2 3 0 :"),
 
526
            {}), transport.calls.pop(0))
 
527
        self.assertTrue(index.has_version("b"))
 
528
        self.assertEqual(2, index.num_versions())
 
529
        self.assertEqual((2, 3), index.get_position("b"))
 
530
        self.assertEqual(["option"], index.get_options("b"))
 
531
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
532
 
 
533
    def test_add_versions(self):
 
534
        transport = MockTransport([
 
535
            _KnitIndex.HEADER
 
536
            ])
 
537
        index = _KnitIndex(transport, "filename", "r")
 
538
 
 
539
        index.add_versions([
 
540
            ("a", ["option"], 0, 1, ["b"]),
 
541
            ("a", ["opt"], 1, 2, ["c"]),
 
542
            ("b", ["option"], 2, 3, ["a"])
 
543
            ])
 
544
        self.assertEqual(("append_bytes", ("filename",
 
545
            "\na option 0 1 .b :"
 
546
            "\na opt 1 2 .c :"
 
547
            "\nb option 2 3 0 :"
 
548
            ), {}), transport.calls.pop(0))
 
549
        self.assertTrue(index.has_version("a"))
 
550
        self.assertTrue(index.has_version("b"))
 
551
        self.assertEqual(2, index.num_versions())
 
552
        self.assertEqual((1, 2), index.get_position("a"))
 
553
        self.assertEqual((2, 3), index.get_position("b"))
 
554
        self.assertEqual(["opt"], index.get_options("a"))
 
555
        self.assertEqual(["option"], index.get_options("b"))
 
556
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
557
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
558
 
 
559
    def test_delay_create_and_add_versions(self):
 
560
        transport = MockTransport()
 
561
 
 
562
        index = _KnitIndex(transport, "filename", "w",
 
563
            create=True, file_mode="wb", create_parent_dir=True,
 
564
            delay_create=True, dir_mode=0777)
 
565
        self.assertEqual([], transport.calls)
 
566
 
 
567
        index.add_versions([
 
568
            ("a", ["option"], 0, 1, ["b"]),
 
569
            ("a", ["opt"], 1, 2, ["c"]),
 
570
            ("b", ["option"], 2, 3, ["a"])
 
571
            ])
 
572
        name, (filename, f), kwargs = transport.calls.pop(0)
 
573
        self.assertEqual("put_file_non_atomic", name)
 
574
        self.assertEqual(
 
575
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
576
            kwargs)
 
577
        self.assertEqual("filename", filename)
 
578
        self.assertEqual(
 
579
            index.HEADER +
 
580
            "\na option 0 1 .b :"
 
581
            "\na opt 1 2 .c :"
 
582
            "\nb option 2 3 0 :",
 
583
            f.read())
 
584
 
 
585
    def test_has_version(self):
 
586
        transport = MockTransport([
 
587
            _KnitIndex.HEADER,
 
588
            "a option 0 1 :"
 
589
            ])
 
590
        index = _KnitIndex(transport, "filename", "r")
 
591
 
 
592
        self.assertTrue(index.has_version("a"))
 
593
        self.assertFalse(index.has_version("b"))
 
594
 
 
595
    def test_get_position(self):
 
596
        transport = MockTransport([
 
597
            _KnitIndex.HEADER,
 
598
            "a option 0 1 :",
 
599
            "b option 1 2 :"
 
600
            ])
 
601
        index = _KnitIndex(transport, "filename", "r")
 
602
 
 
603
        self.assertEqual((0, 1), index.get_position("a"))
 
604
        self.assertEqual((1, 2), index.get_position("b"))
 
605
 
 
606
    def test_get_method(self):
 
607
        transport = MockTransport([
 
608
            _KnitIndex.HEADER,
 
609
            "a fulltext,unknown 0 1 :",
 
610
            "b unknown,line-delta 1 2 :",
 
611
            "c bad 3 4 :"
 
612
            ])
 
613
        index = _KnitIndex(transport, "filename", "r")
 
614
 
 
615
        self.assertEqual("fulltext", index.get_method("a"))
 
616
        self.assertEqual("line-delta", index.get_method("b"))
 
617
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
618
 
 
619
    def test_get_options(self):
 
620
        transport = MockTransport([
 
621
            _KnitIndex.HEADER,
 
622
            "a opt1 0 1 :",
 
623
            "b opt2,opt3 1 2 :"
 
624
            ])
 
625
        index = _KnitIndex(transport, "filename", "r")
 
626
 
 
627
        self.assertEqual(["opt1"], index.get_options("a"))
 
628
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
629
 
 
630
    def test_get_parents(self):
 
631
        transport = MockTransport([
 
632
            _KnitIndex.HEADER,
 
633
            "a option 0 1 :",
 
634
            "b option 1 2 0 .c :",
 
635
            "c option 1 2 1 0 .e :"
 
636
            ])
 
637
        index = _KnitIndex(transport, "filename", "r")
 
638
 
 
639
        self.assertEqual([], index.get_parents("a"))
 
640
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
641
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
642
 
 
643
    def test_get_parents_with_ghosts(self):
 
644
        transport = MockTransport([
 
645
            _KnitIndex.HEADER,
 
646
            "a option 0 1 :",
 
647
            "b option 1 2 0 .c :",
 
648
            "c option 1 2 1 0 .e :"
 
649
            ])
 
650
        index = _KnitIndex(transport, "filename", "r")
 
651
 
 
652
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
653
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
654
        self.assertEqual(["b", "a", "e"],
 
655
            index.get_parents_with_ghosts("c"))
 
656
 
 
657
    def test_check_versions_present(self):
 
658
        transport = MockTransport([
 
659
            _KnitIndex.HEADER,
 
660
            "a option 0 1 :",
 
661
            "b option 0 1 :"
 
662
            ])
 
663
        index = _KnitIndex(transport, "filename", "r")
 
664
 
 
665
        check = index.check_versions_present
 
666
 
 
667
        check([])
 
668
        check(["a"])
 
669
        check(["b"])
 
670
        check(["a", "b"])
 
671
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
672
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
673
 
 
674
 
36
675
class KnitTests(TestCaseWithTransport):
37
676
    """Class containing knit test helper routines."""
38
677
 
630
1269
        text = k.get_text('text-1')
631
1270
        self.assertEqual(TEXT_1, text)
632
1271
        self.assertEqual({}, k._data._cache)
 
1272
 
 
1273
 
 
1274
class TestKnitIndex(KnitTests):
 
1275
 
 
1276
    def test_add_versions_dictionary_compresses(self):
 
1277
        """Adding versions to the index should update the lookup dict"""
 
1278
        knit = self.make_test_knit()
 
1279
        idx = knit._index
 
1280
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1281
        self.check_file_contents('test.kndx',
 
1282
            '# bzr knit index 8\n'
 
1283
            '\n'
 
1284
            'a-1 fulltext 0 0  :'
 
1285
            )
 
1286
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
1287
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
1288
                         ])
 
1289
        self.check_file_contents('test.kndx',
 
1290
            '# bzr knit index 8\n'
 
1291
            '\n'
 
1292
            'a-1 fulltext 0 0  :\n'
 
1293
            'a-2 fulltext 0 0 0 :\n'
 
1294
            'a-3 fulltext 0 0 1 :'
 
1295
            )
 
1296
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1297
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1298
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1299
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1300
                         }, idx._cache)
 
1301
 
 
1302
    def test_add_versions_fails_clean(self):
 
1303
        """If add_versions fails in the middle, it restores a pristine state.
 
1304
 
 
1305
        Any modifications that are made to the index are reset if all versions
 
1306
        cannot be added.
 
1307
        """
 
1308
        # This cheats a little bit by passing in a generator which will
 
1309
        # raise an exception before the processing finishes
 
1310
        # Other possibilities would be to have an version with the wrong number
 
1311
        # of entries, or to make the backing transport unable to write any
 
1312
        # files.
 
1313
 
 
1314
        knit = self.make_test_knit()
 
1315
        idx = knit._index
 
1316
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1317
 
 
1318
        class StopEarly(Exception):
 
1319
            pass
 
1320
 
 
1321
        def generate_failure():
 
1322
            """Add some entries and then raise an exception"""
 
1323
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
1324
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
1325
            raise StopEarly()
 
1326
 
 
1327
        # Assert the pre-condition
 
1328
        self.assertEqual(['a-1'], idx._history)
 
1329
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1330
 
 
1331
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
1332
 
 
1333
        # And it shouldn't be modified
 
1334
        self.assertEqual(['a-1'], idx._history)
 
1335
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1336
 
 
1337
    def test_knit_index_ignores_empty_files(self):
 
1338
        # There was a race condition in older bzr, where a ^C at the right time
 
1339
        # could leave an empty .kndx file, which bzr would later claim was a
 
1340
        # corrupted file since the header was not present. In reality, the file
 
1341
        # just wasn't created, so it should be ignored.
 
1342
        t = get_transport('.')
 
1343
        t.put_bytes('test.kndx', '')
 
1344
 
 
1345
        knit = self.make_test_knit()
 
1346
 
 
1347
    def test_knit_index_checks_header(self):
 
1348
        t = get_transport('.')
 
1349
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
1350
 
 
1351
        self.assertRaises(KnitHeaderError, self.make_test_knit)