~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-11-10 15:38:16 UTC
  • mto: This revision was merged to the branch mainline in revision 2129.
  • Revision ID: john@arbash-meinel.com-20061110153816-46acf76fc86a512b
use try/finally to clean up a nested progress bar during weave fetching

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for Knit data structure"""
18
18
 
19
 
from cStringIO import StringIO
 
19
 
20
20
import difflib
21
 
import gzip
22
 
import sha
23
 
 
24
 
from bzrlib import (
25
 
    errors,
26
 
    )
27
 
from bzrlib.errors import (
28
 
    RevisionAlreadyPresent,
29
 
    KnitHeaderError,
30
 
    RevisionNotPresent,
31
 
    NoSuchFile,
32
 
    )
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
33
24
from bzrlib.knit import (
34
 
    KnitContent,
35
25
    KnitVersionedFile,
36
26
    KnitPlainFactory,
37
27
    KnitAnnotateFactory,
38
 
    _KnitData,
39
 
    _KnitIndex,
40
 
    WeaveToKnit,
41
 
    )
 
28
    WeaveToKnit)
42
29
from bzrlib.osutils import split_lines
43
 
from bzrlib.tests import TestCase, TestCaseWithTransport
 
30
from bzrlib.tests import TestCaseWithTransport
44
31
from bzrlib.transport import TransportLogger, get_transport
45
32
from bzrlib.transport.memory import MemoryTransport
46
33
from bzrlib.weave import Weave
47
34
 
48
35
 
49
 
class KnitContentTests(TestCase):
50
 
 
51
 
    def test_constructor(self):
52
 
        content = KnitContent([])
53
 
 
54
 
    def test_text(self):
55
 
        content = KnitContent([])
56
 
        self.assertEqual(content.text(), [])
57
 
 
58
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
59
 
        self.assertEqual(content.text(), ["text1", "text2"])
60
 
 
61
 
    def test_annotate(self):
62
 
        content = KnitContent([])
63
 
        self.assertEqual(content.annotate(), [])
64
 
 
65
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
66
 
        self.assertEqual(content.annotate(),
67
 
            [("origin1", "text1"), ("origin2", "text2")])
68
 
 
69
 
    def test_annotate_iter(self):
70
 
        content = KnitContent([])
71
 
        it = content.annotate_iter()
72
 
        self.assertRaises(StopIteration, it.next)
73
 
 
74
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
75
 
        it = content.annotate_iter()
76
 
        self.assertEqual(it.next(), ("origin1", "text1"))
77
 
        self.assertEqual(it.next(), ("origin2", "text2"))
78
 
        self.assertRaises(StopIteration, it.next)
79
 
 
80
 
    def test_copy(self):
81
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
82
 
        copy = content.copy()
83
 
        self.assertIsInstance(copy, KnitContent)
84
 
        self.assertEqual(copy.annotate(),
85
 
            [("origin1", "text1"), ("origin2", "text2")])
86
 
 
87
 
    def test_line_delta(self):
88
 
        content1 = KnitContent([("", "a"), ("", "b")])
89
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
90
 
        self.assertEqual(content1.line_delta(content2),
91
 
            [(1, 2, 2, [("", "a"), ("", "c")])])
92
 
 
93
 
    def test_line_delta_iter(self):
94
 
        content1 = KnitContent([("", "a"), ("", "b")])
95
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
96
 
        it = content1.line_delta_iter(content2)
97
 
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
98
 
        self.assertRaises(StopIteration, it.next)
99
 
 
100
 
 
101
 
class MockTransport(object):
102
 
 
103
 
    def __init__(self, file_lines=None):
104
 
        self.file_lines = file_lines
105
 
        self.calls = []
106
 
        # We have no base directory for the MockTransport
107
 
        self.base = ''
108
 
 
109
 
    def get(self, filename):
110
 
        if self.file_lines is None:
111
 
            raise NoSuchFile(filename)
112
 
        else:
113
 
            return StringIO("\n".join(self.file_lines))
114
 
 
115
 
    def readv(self, relpath, offsets):
116
 
        fp = self.get(relpath)
117
 
        for offset, size in offsets:
118
 
            fp.seek(offset)
119
 
            yield offset, fp.read(size)
120
 
 
121
 
    def __getattr__(self, name):
122
 
        def queue_call(*args, **kwargs):
123
 
            self.calls.append((name, args, kwargs))
124
 
        return queue_call
125
 
 
126
 
 
127
 
class LowLevelKnitDataTests(TestCase):
128
 
 
129
 
    def create_gz_content(self, text):
130
 
        sio = StringIO()
131
 
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
132
 
        gz_file.write(text)
133
 
        gz_file.close()
134
 
        return sio.getvalue()
135
 
 
136
 
    def test_valid_knit_data(self):
137
 
        sha1sum = sha.new('foo\nbar\n').hexdigest()
138
 
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
139
 
                                        'foo\n'
140
 
                                        'bar\n'
141
 
                                        'end rev-id-1\n'
142
 
                                        % (sha1sum,))
143
 
        transport = MockTransport([gz_txt])
144
 
        data = _KnitData(transport, 'filename', mode='r')
145
 
        records = [('rev-id-1', 0, len(gz_txt))]
146
 
 
147
 
        contents = data.read_records(records)
148
 
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
149
 
 
150
 
        raw_contents = list(data.read_records_iter_raw(records))
151
 
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
152
 
 
153
 
    def test_not_enough_lines(self):
154
 
        sha1sum = sha.new('foo\n').hexdigest()
155
 
        # record says 2 lines data says 1
156
 
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
157
 
                                        'foo\n'
158
 
                                        'end rev-id-1\n'
159
 
                                        % (sha1sum,))
160
 
        transport = MockTransport([gz_txt])
161
 
        data = _KnitData(transport, 'filename', mode='r')
162
 
        records = [('rev-id-1', 0, len(gz_txt))]
163
 
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
164
 
 
165
 
        # read_records_iter_raw won't detect that sort of mismatch/corruption
166
 
        raw_contents = list(data.read_records_iter_raw(records))
167
 
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
168
 
 
169
 
    def test_too_many_lines(self):
170
 
        sha1sum = sha.new('foo\nbar\n').hexdigest()
171
 
        # record says 1 lines data says 2
172
 
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
173
 
                                        'foo\n'
174
 
                                        'bar\n'
175
 
                                        'end rev-id-1\n'
176
 
                                        % (sha1sum,))
177
 
        transport = MockTransport([gz_txt])
178
 
        data = _KnitData(transport, 'filename', mode='r')
179
 
        records = [('rev-id-1', 0, len(gz_txt))]
180
 
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
181
 
 
182
 
        # read_records_iter_raw won't detect that sort of mismatch/corruption
183
 
        raw_contents = list(data.read_records_iter_raw(records))
184
 
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
185
 
 
186
 
    def test_mismatched_version_id(self):
187
 
        sha1sum = sha.new('foo\nbar\n').hexdigest()
188
 
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
189
 
                                        'foo\n'
190
 
                                        'bar\n'
191
 
                                        'end rev-id-1\n'
192
 
                                        % (sha1sum,))
193
 
        transport = MockTransport([gz_txt])
194
 
        data = _KnitData(transport, 'filename', mode='r')
195
 
        # We are asking for rev-id-2, but the data is rev-id-1
196
 
        records = [('rev-id-2', 0, len(gz_txt))]
197
 
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
198
 
 
199
 
        # read_records_iter_raw will notice if we request the wrong version.
200
 
        self.assertRaises(errors.KnitCorrupt, list,
201
 
                          data.read_records_iter_raw(records))
202
 
 
203
 
    def test_uncompressed_data(self):
204
 
        sha1sum = sha.new('foo\nbar\n').hexdigest()
205
 
        txt = ('version rev-id-1 2 %s\n'
206
 
               'foo\n'
207
 
               'bar\n'
208
 
               'end rev-id-1\n'
209
 
               % (sha1sum,))
210
 
        transport = MockTransport([txt])
211
 
        data = _KnitData(transport, 'filename', mode='r')
212
 
        records = [('rev-id-1', 0, len(txt))]
213
 
 
214
 
        # We don't have valid gzip data ==> corrupt
215
 
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
216
 
 
217
 
        # read_records_iter_raw will notice the bad data
218
 
        self.assertRaises(errors.KnitCorrupt, list,
219
 
                          data.read_records_iter_raw(records))
220
 
 
221
 
    def test_corrupted_data(self):
222
 
        sha1sum = sha.new('foo\nbar\n').hexdigest()
223
 
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
224
 
                                        'foo\n'
225
 
                                        'bar\n'
226
 
                                        'end rev-id-1\n'
227
 
                                        % (sha1sum,))
228
 
        # Change 2 bytes in the middle to \xff
229
 
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
230
 
        transport = MockTransport([gz_txt])
231
 
        data = _KnitData(transport, 'filename', mode='r')
232
 
        records = [('rev-id-1', 0, len(gz_txt))]
233
 
 
234
 
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
235
 
 
236
 
        # read_records_iter_raw will notice if we request the wrong version.
237
 
        self.assertRaises(errors.KnitCorrupt, list,
238
 
                          data.read_records_iter_raw(records))
239
 
 
240
 
 
241
 
class LowLevelKnitIndexTests(TestCase):
242
 
 
243
 
    def test_no_such_file(self):
244
 
        transport = MockTransport()
245
 
 
246
 
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
247
 
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
248
 
            "filename", "w", create=False)
249
 
 
250
 
    def test_create_file(self):
251
 
        transport = MockTransport()
252
 
 
253
 
        index = _KnitIndex(transport, "filename", "w",
254
 
            file_mode="wb", create=True)
255
 
        self.assertEqual(
256
 
                ("put_bytes_non_atomic",
257
 
                    ("filename", index.HEADER), {"mode": "wb"}),
258
 
                transport.calls.pop(0))
259
 
 
260
 
    def test_delay_create_file(self):
261
 
        transport = MockTransport()
262
 
 
263
 
        index = _KnitIndex(transport, "filename", "w",
264
 
            create=True, file_mode="wb", create_parent_dir=True,
265
 
            delay_create=True, dir_mode=0777)
266
 
        self.assertEqual([], transport.calls)
267
 
 
268
 
        index.add_versions([])
269
 
        name, (filename, f), kwargs = transport.calls.pop(0)
270
 
        self.assertEqual("put_file_non_atomic", name)
271
 
        self.assertEqual(
272
 
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
273
 
            kwargs)
274
 
        self.assertEqual("filename", filename)
275
 
        self.assertEqual(index.HEADER, f.read())
276
 
 
277
 
        index.add_versions([])
278
 
        self.assertEqual(("append_bytes", ("filename", ""), {}),
279
 
            transport.calls.pop(0))
280
 
 
281
 
    def test_read_utf8_version_id(self):
282
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
283
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
284
 
        transport = MockTransport([
285
 
            _KnitIndex.HEADER,
286
 
            '%s option 0 1 :' % (utf8_revision_id,)
287
 
            ])
288
 
        index = _KnitIndex(transport, "filename", "r")
289
 
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
290
 
        # Unicode revision_ids.
291
 
        self.assertTrue(index.has_version(utf8_revision_id))
292
 
        self.assertFalse(index.has_version(unicode_revision_id))
293
 
 
294
 
    def test_read_utf8_parents(self):
295
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
296
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
297
 
        transport = MockTransport([
298
 
            _KnitIndex.HEADER,
299
 
            "version option 0 1 .%s :" % (utf8_revision_id,)
300
 
            ])
301
 
        index = _KnitIndex(transport, "filename", "r")
302
 
        self.assertEqual([utf8_revision_id],
303
 
            index.get_parents_with_ghosts("version"))
304
 
 
305
 
    def test_read_ignore_corrupted_lines(self):
306
 
        transport = MockTransport([
307
 
            _KnitIndex.HEADER,
308
 
            "corrupted",
309
 
            "corrupted options 0 1 .b .c ",
310
 
            "version options 0 1 :"
311
 
            ])
312
 
        index = _KnitIndex(transport, "filename", "r")
313
 
        self.assertEqual(1, index.num_versions())
314
 
        self.assertTrue(index.has_version("version"))
315
 
 
316
 
    def test_read_corrupted_header(self):
317
 
        transport = MockTransport(['not a bzr knit index header\n'])
318
 
        self.assertRaises(KnitHeaderError,
319
 
            _KnitIndex, transport, "filename", "r")
320
 
 
321
 
    def test_read_duplicate_entries(self):
322
 
        transport = MockTransport([
323
 
            _KnitIndex.HEADER,
324
 
            "parent options 0 1 :",
325
 
            "version options1 0 1 0 :",
326
 
            "version options2 1 2 .other :",
327
 
            "version options3 3 4 0 .other :"
328
 
            ])
329
 
        index = _KnitIndex(transport, "filename", "r")
330
 
        self.assertEqual(2, index.num_versions())
331
 
        self.assertEqual(1, index.lookup("version"))
332
 
        self.assertEqual((3, 4), index.get_position("version"))
333
 
        self.assertEqual(["options3"], index.get_options("version"))
334
 
        self.assertEqual(["parent", "other"],
335
 
            index.get_parents_with_ghosts("version"))
336
 
 
337
 
    def test_read_compressed_parents(self):
338
 
        transport = MockTransport([
339
 
            _KnitIndex.HEADER,
340
 
            "a option 0 1 :",
341
 
            "b option 0 1 0 :",
342
 
            "c option 0 1 1 0 :",
343
 
            ])
344
 
        index = _KnitIndex(transport, "filename", "r")
345
 
        self.assertEqual(["a"], index.get_parents("b"))
346
 
        self.assertEqual(["b", "a"], index.get_parents("c"))
347
 
 
348
 
    def test_write_utf8_version_id(self):
349
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
350
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
351
 
        transport = MockTransport([
352
 
            _KnitIndex.HEADER
353
 
            ])
354
 
        index = _KnitIndex(transport, "filename", "r")
355
 
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
356
 
        self.assertEqual(("append_bytes", ("filename",
357
 
            "\n%s option 0 1  :" % (utf8_revision_id,)),
358
 
            {}),
359
 
            transport.calls.pop(0))
360
 
 
361
 
    def test_write_utf8_parents(self):
362
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
363
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
364
 
        transport = MockTransport([
365
 
            _KnitIndex.HEADER
366
 
            ])
367
 
        index = _KnitIndex(transport, "filename", "r")
368
 
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
369
 
        self.assertEqual(("append_bytes", ("filename",
370
 
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
371
 
            {}),
372
 
            transport.calls.pop(0))
373
 
 
374
 
    def test_get_graph(self):
375
 
        transport = MockTransport()
376
 
        index = _KnitIndex(transport, "filename", "w", create=True)
377
 
        self.assertEqual([], index.get_graph())
378
 
 
379
 
        index.add_version("a", ["option"], 0, 1, ["b"])
380
 
        self.assertEqual([("a", ["b"])], index.get_graph())
381
 
 
382
 
        index.add_version("c", ["option"], 0, 1, ["d"])
383
 
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
384
 
            sorted(index.get_graph()))
385
 
 
386
 
    def test_get_ancestry(self):
387
 
        transport = MockTransport([
388
 
            _KnitIndex.HEADER,
389
 
            "a option 0 1 :",
390
 
            "b option 0 1 0 .e :",
391
 
            "c option 0 1 1 0 :",
392
 
            "d option 0 1 2 .f :"
393
 
            ])
394
 
        index = _KnitIndex(transport, "filename", "r")
395
 
 
396
 
        self.assertEqual([], index.get_ancestry([]))
397
 
        self.assertEqual(["a"], index.get_ancestry(["a"]))
398
 
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
399
 
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
400
 
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
401
 
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
402
 
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
403
 
 
404
 
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
405
 
 
406
 
    def test_get_ancestry_with_ghosts(self):
407
 
        transport = MockTransport([
408
 
            _KnitIndex.HEADER,
409
 
            "a option 0 1 :",
410
 
            "b option 0 1 0 .e :",
411
 
            "c option 0 1 0 .f .g :",
412
 
            "d option 0 1 2 .h .j .k :"
413
 
            ])
414
 
        index = _KnitIndex(transport, "filename", "r")
415
 
 
416
 
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
417
 
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
418
 
        self.assertEqual(["a", "e", "b"],
419
 
            index.get_ancestry_with_ghosts(["b"]))
420
 
        self.assertEqual(["a", "g", "f", "c"],
421
 
            index.get_ancestry_with_ghosts(["c"]))
422
 
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
423
 
            index.get_ancestry_with_ghosts(["d"]))
424
 
        self.assertEqual(["a", "e", "b"],
425
 
            index.get_ancestry_with_ghosts(["a", "b"]))
426
 
        self.assertEqual(["a", "g", "f", "c"],
427
 
            index.get_ancestry_with_ghosts(["a", "c"]))
428
 
        self.assertEqual(
429
 
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
430
 
            index.get_ancestry_with_ghosts(["b", "d"]))
431
 
 
432
 
        self.assertRaises(RevisionNotPresent,
433
 
            index.get_ancestry_with_ghosts, ["e"])
434
 
 
435
 
    def test_num_versions(self):
436
 
        transport = MockTransport([
437
 
            _KnitIndex.HEADER
438
 
            ])
439
 
        index = _KnitIndex(transport, "filename", "r")
440
 
 
441
 
        self.assertEqual(0, index.num_versions())
442
 
        self.assertEqual(0, len(index))
443
 
 
444
 
        index.add_version("a", ["option"], 0, 1, [])
445
 
        self.assertEqual(1, index.num_versions())
446
 
        self.assertEqual(1, len(index))
447
 
 
448
 
        index.add_version("a", ["option2"], 1, 2, [])
449
 
        self.assertEqual(1, index.num_versions())
450
 
        self.assertEqual(1, len(index))
451
 
 
452
 
        index.add_version("b", ["option"], 0, 1, [])
453
 
        self.assertEqual(2, index.num_versions())
454
 
        self.assertEqual(2, len(index))
455
 
 
456
 
    def test_get_versions(self):
457
 
        transport = MockTransport([
458
 
            _KnitIndex.HEADER
459
 
            ])
460
 
        index = _KnitIndex(transport, "filename", "r")
461
 
 
462
 
        self.assertEqual([], index.get_versions())
463
 
 
464
 
        index.add_version("a", ["option"], 0, 1, [])
465
 
        self.assertEqual(["a"], index.get_versions())
466
 
 
467
 
        index.add_version("a", ["option"], 0, 1, [])
468
 
        self.assertEqual(["a"], index.get_versions())
469
 
 
470
 
        index.add_version("b", ["option"], 0, 1, [])
471
 
        self.assertEqual(["a", "b"], index.get_versions())
472
 
 
473
 
    def test_idx_to_name(self):
474
 
        transport = MockTransport([
475
 
            _KnitIndex.HEADER,
476
 
            "a option 0 1 :",
477
 
            "b option 0 1 :"
478
 
            ])
479
 
        index = _KnitIndex(transport, "filename", "r")
480
 
 
481
 
        self.assertEqual("a", index.idx_to_name(0))
482
 
        self.assertEqual("b", index.idx_to_name(1))
483
 
        self.assertEqual("b", index.idx_to_name(-1))
484
 
        self.assertEqual("a", index.idx_to_name(-2))
485
 
 
486
 
    def test_lookup(self):
487
 
        transport = MockTransport([
488
 
            _KnitIndex.HEADER,
489
 
            "a option 0 1 :",
490
 
            "b option 0 1 :"
491
 
            ])
492
 
        index = _KnitIndex(transport, "filename", "r")
493
 
 
494
 
        self.assertEqual(0, index.lookup("a"))
495
 
        self.assertEqual(1, index.lookup("b"))
496
 
 
497
 
    def test_add_version(self):
498
 
        transport = MockTransport([
499
 
            _KnitIndex.HEADER
500
 
            ])
501
 
        index = _KnitIndex(transport, "filename", "r")
502
 
 
503
 
        index.add_version("a", ["option"], 0, 1, ["b"])
504
 
        self.assertEqual(("append_bytes",
505
 
            ("filename", "\na option 0 1 .b :"),
506
 
            {}), transport.calls.pop(0))
507
 
        self.assertTrue(index.has_version("a"))
508
 
        self.assertEqual(1, index.num_versions())
509
 
        self.assertEqual((0, 1), index.get_position("a"))
510
 
        self.assertEqual(["option"], index.get_options("a"))
511
 
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
512
 
 
513
 
        index.add_version("a", ["opt"], 1, 2, ["c"])
514
 
        self.assertEqual(("append_bytes",
515
 
            ("filename", "\na opt 1 2 .c :"),
516
 
            {}), transport.calls.pop(0))
517
 
        self.assertTrue(index.has_version("a"))
518
 
        self.assertEqual(1, index.num_versions())
519
 
        self.assertEqual((1, 2), index.get_position("a"))
520
 
        self.assertEqual(["opt"], index.get_options("a"))
521
 
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
522
 
 
523
 
        index.add_version("b", ["option"], 2, 3, ["a"])
524
 
        self.assertEqual(("append_bytes",
525
 
            ("filename", "\nb option 2 3 0 :"),
526
 
            {}), transport.calls.pop(0))
527
 
        self.assertTrue(index.has_version("b"))
528
 
        self.assertEqual(2, index.num_versions())
529
 
        self.assertEqual((2, 3), index.get_position("b"))
530
 
        self.assertEqual(["option"], index.get_options("b"))
531
 
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
532
 
 
533
 
    def test_add_versions(self):
534
 
        transport = MockTransport([
535
 
            _KnitIndex.HEADER
536
 
            ])
537
 
        index = _KnitIndex(transport, "filename", "r")
538
 
 
539
 
        index.add_versions([
540
 
            ("a", ["option"], 0, 1, ["b"]),
541
 
            ("a", ["opt"], 1, 2, ["c"]),
542
 
            ("b", ["option"], 2, 3, ["a"])
543
 
            ])
544
 
        self.assertEqual(("append_bytes", ("filename",
545
 
            "\na option 0 1 .b :"
546
 
            "\na opt 1 2 .c :"
547
 
            "\nb option 2 3 0 :"
548
 
            ), {}), transport.calls.pop(0))
549
 
        self.assertTrue(index.has_version("a"))
550
 
        self.assertTrue(index.has_version("b"))
551
 
        self.assertEqual(2, index.num_versions())
552
 
        self.assertEqual((1, 2), index.get_position("a"))
553
 
        self.assertEqual((2, 3), index.get_position("b"))
554
 
        self.assertEqual(["opt"], index.get_options("a"))
555
 
        self.assertEqual(["option"], index.get_options("b"))
556
 
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
557
 
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
558
 
 
559
 
    def test_delay_create_and_add_versions(self):
560
 
        transport = MockTransport()
561
 
 
562
 
        index = _KnitIndex(transport, "filename", "w",
563
 
            create=True, file_mode="wb", create_parent_dir=True,
564
 
            delay_create=True, dir_mode=0777)
565
 
        self.assertEqual([], transport.calls)
566
 
 
567
 
        index.add_versions([
568
 
            ("a", ["option"], 0, 1, ["b"]),
569
 
            ("a", ["opt"], 1, 2, ["c"]),
570
 
            ("b", ["option"], 2, 3, ["a"])
571
 
            ])
572
 
        name, (filename, f), kwargs = transport.calls.pop(0)
573
 
        self.assertEqual("put_file_non_atomic", name)
574
 
        self.assertEqual(
575
 
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
576
 
            kwargs)
577
 
        self.assertEqual("filename", filename)
578
 
        self.assertEqual(
579
 
            index.HEADER +
580
 
            "\na option 0 1 .b :"
581
 
            "\na opt 1 2 .c :"
582
 
            "\nb option 2 3 0 :",
583
 
            f.read())
584
 
 
585
 
    def test_has_version(self):
586
 
        transport = MockTransport([
587
 
            _KnitIndex.HEADER,
588
 
            "a option 0 1 :"
589
 
            ])
590
 
        index = _KnitIndex(transport, "filename", "r")
591
 
 
592
 
        self.assertTrue(index.has_version("a"))
593
 
        self.assertFalse(index.has_version("b"))
594
 
 
595
 
    def test_get_position(self):
596
 
        transport = MockTransport([
597
 
            _KnitIndex.HEADER,
598
 
            "a option 0 1 :",
599
 
            "b option 1 2 :"
600
 
            ])
601
 
        index = _KnitIndex(transport, "filename", "r")
602
 
 
603
 
        self.assertEqual((0, 1), index.get_position("a"))
604
 
        self.assertEqual((1, 2), index.get_position("b"))
605
 
 
606
 
    def test_get_method(self):
607
 
        transport = MockTransport([
608
 
            _KnitIndex.HEADER,
609
 
            "a fulltext,unknown 0 1 :",
610
 
            "b unknown,line-delta 1 2 :",
611
 
            "c bad 3 4 :"
612
 
            ])
613
 
        index = _KnitIndex(transport, "filename", "r")
614
 
 
615
 
        self.assertEqual("fulltext", index.get_method("a"))
616
 
        self.assertEqual("line-delta", index.get_method("b"))
617
 
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
618
 
 
619
 
    def test_get_options(self):
620
 
        transport = MockTransport([
621
 
            _KnitIndex.HEADER,
622
 
            "a opt1 0 1 :",
623
 
            "b opt2,opt3 1 2 :"
624
 
            ])
625
 
        index = _KnitIndex(transport, "filename", "r")
626
 
 
627
 
        self.assertEqual(["opt1"], index.get_options("a"))
628
 
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
629
 
 
630
 
    def test_get_parents(self):
631
 
        transport = MockTransport([
632
 
            _KnitIndex.HEADER,
633
 
            "a option 0 1 :",
634
 
            "b option 1 2 0 .c :",
635
 
            "c option 1 2 1 0 .e :"
636
 
            ])
637
 
        index = _KnitIndex(transport, "filename", "r")
638
 
 
639
 
        self.assertEqual([], index.get_parents("a"))
640
 
        self.assertEqual(["a", "c"], index.get_parents("b"))
641
 
        self.assertEqual(["b", "a"], index.get_parents("c"))
642
 
 
643
 
    def test_get_parents_with_ghosts(self):
644
 
        transport = MockTransport([
645
 
            _KnitIndex.HEADER,
646
 
            "a option 0 1 :",
647
 
            "b option 1 2 0 .c :",
648
 
            "c option 1 2 1 0 .e :"
649
 
            ])
650
 
        index = _KnitIndex(transport, "filename", "r")
651
 
 
652
 
        self.assertEqual([], index.get_parents_with_ghosts("a"))
653
 
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
654
 
        self.assertEqual(["b", "a", "e"],
655
 
            index.get_parents_with_ghosts("c"))
656
 
 
657
 
    def test_check_versions_present(self):
658
 
        transport = MockTransport([
659
 
            _KnitIndex.HEADER,
660
 
            "a option 0 1 :",
661
 
            "b option 0 1 :"
662
 
            ])
663
 
        index = _KnitIndex(transport, "filename", "r")
664
 
 
665
 
        check = index.check_versions_present
666
 
 
667
 
        check([])
668
 
        check(["a"])
669
 
        check(["b"])
670
 
        check(["a", "b"])
671
 
        self.assertRaises(RevisionNotPresent, check, ["c"])
672
 
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
673
 
 
674
 
 
675
36
class KnitTests(TestCaseWithTransport):
676
37
    """Class containing knit test helper routines."""
677
38
 
1333
694
        # And it shouldn't be modified
1334
695
        self.assertEqual(['a-1'], idx._history)
1335
696
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1336
 
 
1337
 
    def test_knit_index_ignores_empty_files(self):
1338
 
        # There was a race condition in older bzr, where a ^C at the right time
1339
 
        # could leave an empty .kndx file, which bzr would later claim was a
1340
 
        # corrupted file since the header was not present. In reality, the file
1341
 
        # just wasn't created, so it should be ignored.
1342
 
        t = get_transport('.')
1343
 
        t.put_bytes('test.kndx', '')
1344
 
 
1345
 
        knit = self.make_test_knit()
1346
 
 
1347
 
    def test_knit_index_checks_header(self):
1348
 
        t = get_transport('.')
1349
 
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
1350
 
 
1351
 
        self.assertRaises(KnitHeaderError, self.make_test_knit)