~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-05-02 21:17:38 UTC
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060502211738-be512e6ec1e6e6cf
Updated strip_trailing_slash to support lots more url stuff, added tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for Knit data structure"""
18
18
 
19
 
from cStringIO import StringIO
 
19
 
20
20
import difflib
21
21
 
22
 
from bzrlib import (
23
 
    errors,
24
 
    )
25
 
from bzrlib.errors import (
26
 
    RevisionAlreadyPresent,
27
 
    KnitHeaderError,
28
 
    RevisionNotPresent,
29
 
    NoSuchFile,
30
 
    )
31
 
from bzrlib.knit import (
32
 
    KnitContent,
33
 
    KnitVersionedFile,
34
 
    KnitPlainFactory,
35
 
    KnitAnnotateFactory,
36
 
    _KnitIndex,
37
 
    WeaveToKnit,
38
 
    )
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
 
24
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
39
25
from bzrlib.osutils import split_lines
40
 
from bzrlib.tests import TestCase, TestCaseWithTransport
 
26
from bzrlib.tests import TestCaseInTempDir
41
27
from bzrlib.transport import TransportLogger, get_transport
42
28
from bzrlib.transport.memory import MemoryTransport
43
 
from bzrlib.weave import Weave
44
 
 
45
 
 
46
 
class KnitContentTests(TestCase):
47
 
 
48
 
    def test_constructor(self):
49
 
        content = KnitContent([])
50
 
 
51
 
    def test_text(self):
52
 
        content = KnitContent([])
53
 
        self.assertEqual(content.text(), [])
54
 
 
55
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
56
 
        self.assertEqual(content.text(), ["text1", "text2"])
57
 
 
58
 
    def test_annotate(self):
59
 
        content = KnitContent([])
60
 
        self.assertEqual(content.annotate(), [])
61
 
 
62
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
63
 
        self.assertEqual(content.annotate(),
64
 
            [("origin1", "text1"), ("origin2", "text2")])
65
 
 
66
 
    def test_annotate_iter(self):
67
 
        content = KnitContent([])
68
 
        it = content.annotate_iter()
69
 
        self.assertRaises(StopIteration, it.next)
70
 
 
71
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
72
 
        it = content.annotate_iter()
73
 
        self.assertEqual(it.next(), ("origin1", "text1"))
74
 
        self.assertEqual(it.next(), ("origin2", "text2"))
75
 
        self.assertRaises(StopIteration, it.next)
76
 
 
77
 
    def test_copy(self):
78
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
79
 
        copy = content.copy()
80
 
        self.assertIsInstance(copy, KnitContent)
81
 
        self.assertEqual(copy.annotate(),
82
 
            [("origin1", "text1"), ("origin2", "text2")])
83
 
 
84
 
    def test_line_delta(self):
85
 
        content1 = KnitContent([("", "a"), ("", "b")])
86
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
87
 
        self.assertEqual(content1.line_delta(content2),
88
 
            [(1, 2, 2, [("", "a"), ("", "c")])])
89
 
 
90
 
    def test_line_delta_iter(self):
91
 
        content1 = KnitContent([("", "a"), ("", "b")])
92
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
93
 
        it = content1.line_delta_iter(content2)
94
 
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
95
 
        self.assertRaises(StopIteration, it.next)
96
 
 
97
 
 
98
 
class MockTransport(object):
99
 
 
100
 
    def __init__(self, file_lines=None):
101
 
        self.file_lines = file_lines
102
 
        self.calls = []
103
 
        # We have no base directory for the MockTransport
104
 
        self.base = ''
105
 
 
106
 
    def get(self, filename):
107
 
        if self.file_lines is None:
108
 
            raise NoSuchFile(filename)
109
 
        else:
110
 
            return StringIO("\n".join(self.file_lines))
111
 
 
112
 
    def __getattr__(self, name):
113
 
        def queue_call(*args, **kwargs):
114
 
            self.calls.append((name, args, kwargs))
115
 
        return queue_call
116
 
 
117
 
 
118
 
class LowLevelKnitIndexTests(TestCase):
119
 
 
120
 
    def test_no_such_file(self):
121
 
        transport = MockTransport()
122
 
 
123
 
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
124
 
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
125
 
            "filename", "w", create=False)
126
 
 
127
 
    def test_create_file(self):
128
 
        transport = MockTransport()
129
 
 
130
 
        index = _KnitIndex(transport, "filename", "w",
131
 
            file_mode="wb", create=True)
132
 
        self.assertEqual(
133
 
                ("put_bytes_non_atomic",
134
 
                    ("filename", index.HEADER), {"mode": "wb"}),
135
 
                transport.calls.pop(0))
136
 
 
137
 
    def test_delay_create_file(self):
138
 
        transport = MockTransport()
139
 
 
140
 
        index = _KnitIndex(transport, "filename", "w",
141
 
            create=True, file_mode="wb", create_parent_dir=True,
142
 
            delay_create=True, dir_mode=0777)
143
 
        self.assertEqual([], transport.calls)
144
 
 
145
 
        index.add_versions([])
146
 
        name, (filename, f), kwargs = transport.calls.pop(0)
147
 
        self.assertEqual("put_file_non_atomic", name)
148
 
        self.assertEqual(
149
 
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
150
 
            kwargs)
151
 
        self.assertEqual("filename", filename)
152
 
        self.assertEqual(index.HEADER, f.read())
153
 
 
154
 
        index.add_versions([])
155
 
        self.assertEqual(("append_bytes", ("filename", ""), {}),
156
 
            transport.calls.pop(0))
157
 
 
158
 
    def test_read_utf8_version_id(self):
159
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
160
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
161
 
        transport = MockTransport([
162
 
            _KnitIndex.HEADER,
163
 
            '%s option 0 1 :' % (utf8_revision_id,)
164
 
            ])
165
 
        index = _KnitIndex(transport, "filename", "r")
166
 
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
167
 
        # Unicode revision_ids.
168
 
        self.assertTrue(index.has_version(utf8_revision_id))
169
 
        self.assertFalse(index.has_version(unicode_revision_id))
170
 
 
171
 
    def test_read_utf8_parents(self):
172
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
173
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
174
 
        transport = MockTransport([
175
 
            _KnitIndex.HEADER,
176
 
            "version option 0 1 .%s :" % (utf8_revision_id,)
177
 
            ])
178
 
        index = _KnitIndex(transport, "filename", "r")
179
 
        self.assertEqual([utf8_revision_id],
180
 
            index.get_parents_with_ghosts("version"))
181
 
 
182
 
    def test_read_ignore_corrupted_lines(self):
183
 
        transport = MockTransport([
184
 
            _KnitIndex.HEADER,
185
 
            "corrupted",
186
 
            "corrupted options 0 1 .b .c ",
187
 
            "version options 0 1 :"
188
 
            ])
189
 
        index = _KnitIndex(transport, "filename", "r")
190
 
        self.assertEqual(1, index.num_versions())
191
 
        self.assertTrue(index.has_version("version"))
192
 
 
193
 
    def test_read_corrupted_header(self):
194
 
        transport = MockTransport(['not a bzr knit index header\n'])
195
 
        self.assertRaises(KnitHeaderError,
196
 
            _KnitIndex, transport, "filename", "r")
197
 
 
198
 
    def test_read_duplicate_entries(self):
199
 
        transport = MockTransport([
200
 
            _KnitIndex.HEADER,
201
 
            "parent options 0 1 :",
202
 
            "version options1 0 1 0 :",
203
 
            "version options2 1 2 .other :",
204
 
            "version options3 3 4 0 .other :"
205
 
            ])
206
 
        index = _KnitIndex(transport, "filename", "r")
207
 
        self.assertEqual(2, index.num_versions())
208
 
        self.assertEqual(1, index.lookup("version"))
209
 
        self.assertEqual((3, 4), index.get_position("version"))
210
 
        self.assertEqual(["options3"], index.get_options("version"))
211
 
        self.assertEqual(["parent", "other"],
212
 
            index.get_parents_with_ghosts("version"))
213
 
 
214
 
    def test_read_compressed_parents(self):
215
 
        transport = MockTransport([
216
 
            _KnitIndex.HEADER,
217
 
            "a option 0 1 :",
218
 
            "b option 0 1 0 :",
219
 
            "c option 0 1 1 0 :",
220
 
            ])
221
 
        index = _KnitIndex(transport, "filename", "r")
222
 
        self.assertEqual(["a"], index.get_parents("b"))
223
 
        self.assertEqual(["b", "a"], index.get_parents("c"))
224
 
 
225
 
    def test_write_utf8_version_id(self):
226
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
227
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
228
 
        transport = MockTransport([
229
 
            _KnitIndex.HEADER
230
 
            ])
231
 
        index = _KnitIndex(transport, "filename", "r")
232
 
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
233
 
        self.assertEqual(("append_bytes", ("filename",
234
 
            "\n%s option 0 1  :" % (utf8_revision_id,)),
235
 
            {}),
236
 
            transport.calls.pop(0))
237
 
 
238
 
    def test_write_utf8_parents(self):
239
 
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
240
 
        utf8_revision_id = unicode_revision_id.encode('utf-8')
241
 
        transport = MockTransport([
242
 
            _KnitIndex.HEADER
243
 
            ])
244
 
        index = _KnitIndex(transport, "filename", "r")
245
 
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
246
 
        self.assertEqual(("append_bytes", ("filename",
247
 
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
248
 
            {}),
249
 
            transport.calls.pop(0))
250
 
 
251
 
    def test_get_graph(self):
252
 
        transport = MockTransport()
253
 
        index = _KnitIndex(transport, "filename", "w", create=True)
254
 
        self.assertEqual([], index.get_graph())
255
 
 
256
 
        index.add_version("a", ["option"], 0, 1, ["b"])
257
 
        self.assertEqual([("a", ["b"])], index.get_graph())
258
 
 
259
 
        index.add_version("c", ["option"], 0, 1, ["d"])
260
 
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
261
 
            sorted(index.get_graph()))
262
 
 
263
 
    def test_get_ancestry(self):
264
 
        transport = MockTransport([
265
 
            _KnitIndex.HEADER,
266
 
            "a option 0 1 :",
267
 
            "b option 0 1 0 .e :",
268
 
            "c option 0 1 1 0 :",
269
 
            "d option 0 1 2 .f :"
270
 
            ])
271
 
        index = _KnitIndex(transport, "filename", "r")
272
 
 
273
 
        self.assertEqual([], index.get_ancestry([]))
274
 
        self.assertEqual(["a"], index.get_ancestry(["a"]))
275
 
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
276
 
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
277
 
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
278
 
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
279
 
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
280
 
 
281
 
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
282
 
 
283
 
    def test_get_ancestry_with_ghosts(self):
284
 
        transport = MockTransport([
285
 
            _KnitIndex.HEADER,
286
 
            "a option 0 1 :",
287
 
            "b option 0 1 0 .e :",
288
 
            "c option 0 1 0 .f .g :",
289
 
            "d option 0 1 2 .h .j .k :"
290
 
            ])
291
 
        index = _KnitIndex(transport, "filename", "r")
292
 
 
293
 
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
294
 
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
295
 
        self.assertEqual(["a", "e", "b"],
296
 
            index.get_ancestry_with_ghosts(["b"]))
297
 
        self.assertEqual(["a", "g", "f", "c"],
298
 
            index.get_ancestry_with_ghosts(["c"]))
299
 
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
300
 
            index.get_ancestry_with_ghosts(["d"]))
301
 
        self.assertEqual(["a", "e", "b"],
302
 
            index.get_ancestry_with_ghosts(["a", "b"]))
303
 
        self.assertEqual(["a", "g", "f", "c"],
304
 
            index.get_ancestry_with_ghosts(["a", "c"]))
305
 
        self.assertEqual(
306
 
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
307
 
            index.get_ancestry_with_ghosts(["b", "d"]))
308
 
 
309
 
        self.assertRaises(RevisionNotPresent,
310
 
            index.get_ancestry_with_ghosts, ["e"])
311
 
 
312
 
    def test_num_versions(self):
313
 
        transport = MockTransport([
314
 
            _KnitIndex.HEADER
315
 
            ])
316
 
        index = _KnitIndex(transport, "filename", "r")
317
 
 
318
 
        self.assertEqual(0, index.num_versions())
319
 
        self.assertEqual(0, len(index))
320
 
 
321
 
        index.add_version("a", ["option"], 0, 1, [])
322
 
        self.assertEqual(1, index.num_versions())
323
 
        self.assertEqual(1, len(index))
324
 
 
325
 
        index.add_version("a", ["option2"], 1, 2, [])
326
 
        self.assertEqual(1, index.num_versions())
327
 
        self.assertEqual(1, len(index))
328
 
 
329
 
        index.add_version("b", ["option"], 0, 1, [])
330
 
        self.assertEqual(2, index.num_versions())
331
 
        self.assertEqual(2, len(index))
332
 
 
333
 
    def test_get_versions(self):
334
 
        transport = MockTransport([
335
 
            _KnitIndex.HEADER
336
 
            ])
337
 
        index = _KnitIndex(transport, "filename", "r")
338
 
 
339
 
        self.assertEqual([], index.get_versions())
340
 
 
341
 
        index.add_version("a", ["option"], 0, 1, [])
342
 
        self.assertEqual(["a"], index.get_versions())
343
 
 
344
 
        index.add_version("a", ["option"], 0, 1, [])
345
 
        self.assertEqual(["a"], index.get_versions())
346
 
 
347
 
        index.add_version("b", ["option"], 0, 1, [])
348
 
        self.assertEqual(["a", "b"], index.get_versions())
349
 
 
350
 
    def test_idx_to_name(self):
351
 
        transport = MockTransport([
352
 
            _KnitIndex.HEADER,
353
 
            "a option 0 1 :",
354
 
            "b option 0 1 :"
355
 
            ])
356
 
        index = _KnitIndex(transport, "filename", "r")
357
 
 
358
 
        self.assertEqual("a", index.idx_to_name(0))
359
 
        self.assertEqual("b", index.idx_to_name(1))
360
 
        self.assertEqual("b", index.idx_to_name(-1))
361
 
        self.assertEqual("a", index.idx_to_name(-2))
362
 
 
363
 
    def test_lookup(self):
364
 
        transport = MockTransport([
365
 
            _KnitIndex.HEADER,
366
 
            "a option 0 1 :",
367
 
            "b option 0 1 :"
368
 
            ])
369
 
        index = _KnitIndex(transport, "filename", "r")
370
 
 
371
 
        self.assertEqual(0, index.lookup("a"))
372
 
        self.assertEqual(1, index.lookup("b"))
373
 
 
374
 
    def test_add_version(self):
375
 
        transport = MockTransport([
376
 
            _KnitIndex.HEADER
377
 
            ])
378
 
        index = _KnitIndex(transport, "filename", "r")
379
 
 
380
 
        index.add_version("a", ["option"], 0, 1, ["b"])
381
 
        self.assertEqual(("append_bytes",
382
 
            ("filename", "\na option 0 1 .b :"),
383
 
            {}), transport.calls.pop(0))
384
 
        self.assertTrue(index.has_version("a"))
385
 
        self.assertEqual(1, index.num_versions())
386
 
        self.assertEqual((0, 1), index.get_position("a"))
387
 
        self.assertEqual(["option"], index.get_options("a"))
388
 
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
389
 
 
390
 
        index.add_version("a", ["opt"], 1, 2, ["c"])
391
 
        self.assertEqual(("append_bytes",
392
 
            ("filename", "\na opt 1 2 .c :"),
393
 
            {}), transport.calls.pop(0))
394
 
        self.assertTrue(index.has_version("a"))
395
 
        self.assertEqual(1, index.num_versions())
396
 
        self.assertEqual((1, 2), index.get_position("a"))
397
 
        self.assertEqual(["opt"], index.get_options("a"))
398
 
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
399
 
 
400
 
        index.add_version("b", ["option"], 2, 3, ["a"])
401
 
        self.assertEqual(("append_bytes",
402
 
            ("filename", "\nb option 2 3 0 :"),
403
 
            {}), transport.calls.pop(0))
404
 
        self.assertTrue(index.has_version("b"))
405
 
        self.assertEqual(2, index.num_versions())
406
 
        self.assertEqual((2, 3), index.get_position("b"))
407
 
        self.assertEqual(["option"], index.get_options("b"))
408
 
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
409
 
 
410
 
    def test_add_versions(self):
411
 
        transport = MockTransport([
412
 
            _KnitIndex.HEADER
413
 
            ])
414
 
        index = _KnitIndex(transport, "filename", "r")
415
 
 
416
 
        index.add_versions([
417
 
            ("a", ["option"], 0, 1, ["b"]),
418
 
            ("a", ["opt"], 1, 2, ["c"]),
419
 
            ("b", ["option"], 2, 3, ["a"])
420
 
            ])
421
 
        self.assertEqual(("append_bytes", ("filename",
422
 
            "\na option 0 1 .b :"
423
 
            "\na opt 1 2 .c :"
424
 
            "\nb option 2 3 0 :"
425
 
            ), {}), transport.calls.pop(0))
426
 
        self.assertTrue(index.has_version("a"))
427
 
        self.assertTrue(index.has_version("b"))
428
 
        self.assertEqual(2, index.num_versions())
429
 
        self.assertEqual((1, 2), index.get_position("a"))
430
 
        self.assertEqual((2, 3), index.get_position("b"))
431
 
        self.assertEqual(["opt"], index.get_options("a"))
432
 
        self.assertEqual(["option"], index.get_options("b"))
433
 
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
434
 
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
435
 
 
436
 
    def test_delay_create_and_add_versions(self):
437
 
        transport = MockTransport()
438
 
 
439
 
        index = _KnitIndex(transport, "filename", "w",
440
 
            create=True, file_mode="wb", create_parent_dir=True,
441
 
            delay_create=True, dir_mode=0777)
442
 
        self.assertEqual([], transport.calls)
443
 
 
444
 
        index.add_versions([
445
 
            ("a", ["option"], 0, 1, ["b"]),
446
 
            ("a", ["opt"], 1, 2, ["c"]),
447
 
            ("b", ["option"], 2, 3, ["a"])
448
 
            ])
449
 
        name, (filename, f), kwargs = transport.calls.pop(0)
450
 
        self.assertEqual("put_file_non_atomic", name)
451
 
        self.assertEqual(
452
 
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
453
 
            kwargs)
454
 
        self.assertEqual("filename", filename)
455
 
        self.assertEqual(
456
 
            index.HEADER +
457
 
            "\na option 0 1 .b :"
458
 
            "\na opt 1 2 .c :"
459
 
            "\nb option 2 3 0 :",
460
 
            f.read())
461
 
 
462
 
    def test_has_version(self):
463
 
        transport = MockTransport([
464
 
            _KnitIndex.HEADER,
465
 
            "a option 0 1 :"
466
 
            ])
467
 
        index = _KnitIndex(transport, "filename", "r")
468
 
 
469
 
        self.assertTrue(index.has_version("a"))
470
 
        self.assertFalse(index.has_version("b"))
471
 
 
472
 
    def test_get_position(self):
473
 
        transport = MockTransport([
474
 
            _KnitIndex.HEADER,
475
 
            "a option 0 1 :",
476
 
            "b option 1 2 :"
477
 
            ])
478
 
        index = _KnitIndex(transport, "filename", "r")
479
 
 
480
 
        self.assertEqual((0, 1), index.get_position("a"))
481
 
        self.assertEqual((1, 2), index.get_position("b"))
482
 
 
483
 
    def test_get_method(self):
484
 
        transport = MockTransport([
485
 
            _KnitIndex.HEADER,
486
 
            "a fulltext,unknown 0 1 :",
487
 
            "b unknown,line-delta 1 2 :",
488
 
            "c bad 3 4 :"
489
 
            ])
490
 
        index = _KnitIndex(transport, "filename", "r")
491
 
 
492
 
        self.assertEqual("fulltext", index.get_method("a"))
493
 
        self.assertEqual("line-delta", index.get_method("b"))
494
 
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
495
 
 
496
 
    def test_get_options(self):
497
 
        transport = MockTransport([
498
 
            _KnitIndex.HEADER,
499
 
            "a opt1 0 1 :",
500
 
            "b opt2,opt3 1 2 :"
501
 
            ])
502
 
        index = _KnitIndex(transport, "filename", "r")
503
 
 
504
 
        self.assertEqual(["opt1"], index.get_options("a"))
505
 
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
506
 
 
507
 
    def test_get_parents(self):
508
 
        transport = MockTransport([
509
 
            _KnitIndex.HEADER,
510
 
            "a option 0 1 :",
511
 
            "b option 1 2 0 .c :",
512
 
            "c option 1 2 1 0 .e :"
513
 
            ])
514
 
        index = _KnitIndex(transport, "filename", "r")
515
 
 
516
 
        self.assertEqual([], index.get_parents("a"))
517
 
        self.assertEqual(["a", "c"], index.get_parents("b"))
518
 
        self.assertEqual(["b", "a"], index.get_parents("c"))
519
 
 
520
 
    def test_get_parents_with_ghosts(self):
521
 
        transport = MockTransport([
522
 
            _KnitIndex.HEADER,
523
 
            "a option 0 1 :",
524
 
            "b option 1 2 0 .c :",
525
 
            "c option 1 2 1 0 .e :"
526
 
            ])
527
 
        index = _KnitIndex(transport, "filename", "r")
528
 
 
529
 
        self.assertEqual([], index.get_parents_with_ghosts("a"))
530
 
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
531
 
        self.assertEqual(["b", "a", "e"],
532
 
            index.get_parents_with_ghosts("c"))
533
 
 
534
 
    def test_check_versions_present(self):
535
 
        transport = MockTransport([
536
 
            _KnitIndex.HEADER,
537
 
            "a option 0 1 :",
538
 
            "b option 0 1 :"
539
 
            ])
540
 
        index = _KnitIndex(transport, "filename", "r")
541
 
 
542
 
        check = index.check_versions_present
543
 
 
544
 
        check([])
545
 
        check(["a"])
546
 
        check(["b"])
547
 
        check(["a", "b"])
548
 
        self.assertRaises(RevisionNotPresent, check, ["c"])
549
 
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
550
 
 
551
 
 
552
 
class KnitTests(TestCaseWithTransport):
553
 
    """Class containing knit test helper routines."""
554
 
 
555
 
    def make_test_knit(self, annotate=False, delay_create=False):
556
 
        if not annotate:
557
 
            factory = KnitPlainFactory()
558
 
        else:
559
 
            factory = None
560
 
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
561
 
                                 factory=factory, create=True,
562
 
                                 delay_create=delay_create)
563
 
 
564
 
 
565
 
class BasicKnitTests(KnitTests):
 
29
 
 
30
 
 
31
class KnitTests(TestCaseInTempDir):
566
32
 
567
33
    def add_stock_one_and_one_a(self, k):
568
34
        k.add_lines('text-1', [], split_lines(TEXT_1))
572
38
        """Construct empty k"""
573
39
        self.make_test_knit()
574
40
 
 
41
    def make_test_knit(self, annotate=False):
 
42
        if not annotate:
 
43
            factory = KnitPlainFactory()
 
44
        else:
 
45
            factory = None
 
46
        return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
 
47
 
575
48
    def test_knit_add(self):
576
49
        """Store one text in knit and retrieve"""
577
50
        k = self.make_test_knit()
790
263
        self.assertEquals(origins[0], ('text-c', 'z\n'))
791
264
        self.assertEquals(origins[1], ('text-b', 'c\n'))
792
265
 
793
 
    def test_get_line_delta_texts(self):
794
 
        """Make sure we can call get_texts on text with reused line deltas"""
795
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
796
 
                               factory=KnitPlainFactory(), create=True)
797
 
        for t in range(3):
798
 
            if t == 0:
799
 
                parents = []
800
 
            else:
801
 
                parents = ['%d' % (t-1)]
802
 
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
803
 
        k1.get_texts(('%d' % t) for t in range(3))
 
266
    def test_extraction_reads_components_once(self):
 
267
        t = MemoryTransport()
 
268
        instrumented_t = TransportLogger(t)
 
269
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
270
        # should read the index
 
271
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
272
        instrumented_t._calls = []
 
273
        # add a text       
 
274
        k1.add_lines('base', [], ['text\n'])
 
275
        # should not have read at all
 
276
        self.assertEqual([], instrumented_t._calls)
 
277
 
 
278
        # add a text
 
279
        k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
 
280
        # should not have read at all
 
281
        self.assertEqual([], instrumented_t._calls)
 
282
        
 
283
        # read a text
 
284
        k1.get_lines('sub')
 
285
        # should not have read at all
 
286
        self.assertEqual([], instrumented_t._calls)
 
287
 
 
288
        # clear the cache
 
289
        k1.clear_cache()
 
290
 
 
291
        # read a text
 
292
        k1.get_lines('base')
 
293
        # should have read a component
 
294
        # should not have read the first component only
 
295
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
296
        instrumented_t._calls = []
 
297
        # read again
 
298
        k1.get_lines('base')
 
299
        # should not have read at all
 
300
        self.assertEqual([], instrumented_t._calls)
 
301
        # and now read the other component
 
302
        k1.get_lines('sub')
 
303
        # should have read the second component
 
304
        self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
 
305
        instrumented_t._calls = []
 
306
 
 
307
        # clear the cache
 
308
        k1.clear_cache()
 
309
        # add a text cold 
 
310
        k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
 
311
        # should read the first component only
 
312
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
804
313
        
805
314
    def test_iter_lines_reads_in_order(self):
806
315
        t = MemoryTransport()
830
339
        # this tests that a new knit index file has the expected content
831
340
        # and that is writes the data we expect as records are added.
832
341
        knit = self.make_test_knit(True)
833
 
        # Now knit files are not created until we first add data to them
834
342
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
835
343
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
836
344
        self.assertFileEqual(
861
369
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
862
370
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
863
371
 
864
 
    def test_delay_create(self):
865
 
        """Test that passing delay_create=True creates files late"""
866
 
        knit = self.make_test_knit(annotate=True, delay_create=True)
867
 
        self.failIfExists('test.knit')
868
 
        self.failIfExists('test.kndx')
869
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
870
 
        self.failUnlessExists('test.knit')
871
 
        self.assertFileEqual(
872
 
            "# bzr knit index 8\n"
873
 
            "\n"
874
 
            "revid fulltext 0 84 .a_ghost :",
875
 
            'test.kndx')
876
 
 
877
 
    def test_create_parent_dir(self):
878
 
        """create_parent_dir can create knits in nonexistant dirs"""
879
 
        # Has no effect if we don't set 'delay_create'
880
 
        trans = get_transport('.')
881
 
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
882
 
                          trans, access_mode='w', factory=None,
883
 
                          create=True, create_parent_dir=True)
884
 
        # Nothing should have changed yet
885
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
886
 
                                 factory=None, create=True,
887
 
                                 create_parent_dir=True,
888
 
                                 delay_create=True)
889
 
        self.failIfExists('dir/test.knit')
890
 
        self.failIfExists('dir/test.kndx')
891
 
        self.failIfExists('dir')
892
 
        knit.add_lines('revid', [], ['a\n'])
893
 
        self.failUnlessExists('dir')
894
 
        self.failUnlessExists('dir/test.knit')
895
 
        self.assertFileEqual(
896
 
            "# bzr knit index 8\n"
897
 
            "\n"
898
 
            "revid fulltext 0 84  :",
899
 
            'dir/test.kndx')
900
 
 
901
 
    def test_create_mode_700(self):
902
 
        trans = get_transport('.')
903
 
        if not trans._can_roundtrip_unix_modebits():
904
 
            # Can't roundtrip, so no need to run this test
905
 
            return
906
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
907
 
                                 factory=None, create=True,
908
 
                                 create_parent_dir=True,
909
 
                                 delay_create=True,
910
 
                                 file_mode=0600,
911
 
                                 dir_mode=0700)
912
 
        knit.add_lines('revid', [], ['a\n'])
913
 
        self.assertTransportMode(trans, 'dir', 0700)
914
 
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
915
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
916
 
 
917
 
    def test_create_mode_770(self):
918
 
        trans = get_transport('.')
919
 
        if not trans._can_roundtrip_unix_modebits():
920
 
            # Can't roundtrip, so no need to run this test
921
 
            return
922
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
923
 
                                 factory=None, create=True,
924
 
                                 create_parent_dir=True,
925
 
                                 delay_create=True,
926
 
                                 file_mode=0660,
927
 
                                 dir_mode=0770)
928
 
        knit.add_lines('revid', [], ['a\n'])
929
 
        self.assertTransportMode(trans, 'dir', 0770)
930
 
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
931
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
932
 
 
933
 
    def test_create_mode_777(self):
934
 
        trans = get_transport('.')
935
 
        if not trans._can_roundtrip_unix_modebits():
936
 
            # Can't roundtrip, so no need to run this test
937
 
            return
938
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
939
 
                                 factory=None, create=True,
940
 
                                 create_parent_dir=True,
941
 
                                 delay_create=True,
942
 
                                 file_mode=0666,
943
 
                                 dir_mode=0777)
944
 
        knit.add_lines('revid', [], ['a\n'])
945
 
        self.assertTransportMode(trans, 'dir', 0777)
946
 
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
947
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
948
 
 
949
372
    def test_plan_merge(self):
950
373
        my_knit = self.make_test_knit(annotate=True)
951
374
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
1041
464
        i = i + c
1042
465
        offset = offset + (b - a) + c
1043
466
    return out
1044
 
 
1045
 
 
1046
 
class TestWeaveToKnit(KnitTests):
1047
 
 
1048
 
    def test_weave_to_knit_matches(self):
1049
 
        # check that the WeaveToKnit is_compatible function
1050
 
        # registers True for a Weave to a Knit.
1051
 
        w = Weave()
1052
 
        k = self.make_test_knit()
1053
 
        self.failUnless(WeaveToKnit.is_compatible(w, k))
1054
 
        self.failIf(WeaveToKnit.is_compatible(k, w))
1055
 
        self.failIf(WeaveToKnit.is_compatible(w, w))
1056
 
        self.failIf(WeaveToKnit.is_compatible(k, k))
1057
 
 
1058
 
 
1059
 
class TestKnitCaching(KnitTests):
1060
 
    
1061
 
    def create_knit(self, cache_add=False):
1062
 
        k = self.make_test_knit(True)
1063
 
        if cache_add:
1064
 
            k.enable_cache()
1065
 
 
1066
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
1067
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
1068
 
        return k
1069
 
 
1070
 
    def test_no_caching(self):
1071
 
        k = self.create_knit()
1072
 
        # Nothing should be cached without setting 'enable_cache'
1073
 
        self.assertEqual({}, k._data._cache)
1074
 
 
1075
 
    def test_cache_add_and_clear(self):
1076
 
        k = self.create_knit(True)
1077
 
 
1078
 
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1079
 
 
1080
 
        k.clear_cache()
1081
 
        self.assertEqual({}, k._data._cache)
1082
 
 
1083
 
    def test_cache_data_read_raw(self):
1084
 
        k = self.create_knit()
1085
 
 
1086
 
        # Now cache and read
1087
 
        k.enable_cache()
1088
 
 
1089
 
        def read_one_raw(version):
1090
 
            pos_map = k._get_components_positions([version])
1091
 
            method, pos, size, next = pos_map[version]
1092
 
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1093
 
            self.assertEqual(1, len(lst))
1094
 
            return lst[0]
1095
 
 
1096
 
        val = read_one_raw('text-1')
1097
 
        self.assertEqual({'text-1':val[1]}, k._data._cache)
1098
 
 
1099
 
        k.clear_cache()
1100
 
        # After clear, new reads are not cached
1101
 
        self.assertEqual({}, k._data._cache)
1102
 
 
1103
 
        val2 = read_one_raw('text-1')
1104
 
        self.assertEqual(val, val2)
1105
 
        self.assertEqual({}, k._data._cache)
1106
 
 
1107
 
    def test_cache_data_read(self):
1108
 
        k = self.create_knit()
1109
 
 
1110
 
        def read_one(version):
1111
 
            pos_map = k._get_components_positions([version])
1112
 
            method, pos, size, next = pos_map[version]
1113
 
            lst = list(k._data.read_records_iter([(version, pos, size)]))
1114
 
            self.assertEqual(1, len(lst))
1115
 
            return lst[0]
1116
 
 
1117
 
        # Now cache and read
1118
 
        k.enable_cache()
1119
 
 
1120
 
        val = read_one('text-2')
1121
 
        self.assertEqual(['text-2'], k._data._cache.keys())
1122
 
        self.assertEqual('text-2', val[0])
1123
 
        content, digest = k._data._parse_record('text-2',
1124
 
                                                k._data._cache['text-2'])
1125
 
        self.assertEqual(content, val[1])
1126
 
        self.assertEqual(digest, val[2])
1127
 
 
1128
 
        k.clear_cache()
1129
 
        self.assertEqual({}, k._data._cache)
1130
 
 
1131
 
        val2 = read_one('text-2')
1132
 
        self.assertEqual(val, val2)
1133
 
        self.assertEqual({}, k._data._cache)
1134
 
 
1135
 
    def test_cache_read(self):
1136
 
        k = self.create_knit()
1137
 
        k.enable_cache()
1138
 
 
1139
 
        text = k.get_text('text-1')
1140
 
        self.assertEqual(TEXT_1, text)
1141
 
        self.assertEqual(['text-1'], k._data._cache.keys())
1142
 
 
1143
 
        k.clear_cache()
1144
 
        self.assertEqual({}, k._data._cache)
1145
 
 
1146
 
        text = k.get_text('text-1')
1147
 
        self.assertEqual(TEXT_1, text)
1148
 
        self.assertEqual({}, k._data._cache)
1149
 
 
1150
 
 
1151
 
class TestKnitIndex(KnitTests):
1152
 
 
1153
 
    def test_add_versions_dictionary_compresses(self):
1154
 
        """Adding versions to the index should update the lookup dict"""
1155
 
        knit = self.make_test_knit()
1156
 
        idx = knit._index
1157
 
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
1158
 
        self.check_file_contents('test.kndx',
1159
 
            '# bzr knit index 8\n'
1160
 
            '\n'
1161
 
            'a-1 fulltext 0 0  :'
1162
 
            )
1163
 
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1164
 
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
1165
 
                         ])
1166
 
        self.check_file_contents('test.kndx',
1167
 
            '# bzr knit index 8\n'
1168
 
            '\n'
1169
 
            'a-1 fulltext 0 0  :\n'
1170
 
            'a-2 fulltext 0 0 0 :\n'
1171
 
            'a-3 fulltext 0 0 1 :'
1172
 
            )
1173
 
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1174
 
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1175
 
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1176
 
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1177
 
                         }, idx._cache)
1178
 
 
1179
 
    def test_add_versions_fails_clean(self):
1180
 
        """If add_versions fails in the middle, it restores a pristine state.
1181
 
 
1182
 
        Any modifications that are made to the index are reset if all versions
1183
 
        cannot be added.
1184
 
        """
1185
 
        # This cheats a little bit by passing in a generator which will
1186
 
        # raise an exception before the processing finishes
1187
 
        # Other possibilities would be to have an version with the wrong number
1188
 
        # of entries, or to make the backing transport unable to write any
1189
 
        # files.
1190
 
 
1191
 
        knit = self.make_test_knit()
1192
 
        idx = knit._index
1193
 
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
1194
 
 
1195
 
        class StopEarly(Exception):
1196
 
            pass
1197
 
 
1198
 
        def generate_failure():
1199
 
            """Add some entries and then raise an exception"""
1200
 
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1201
 
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1202
 
            raise StopEarly()
1203
 
 
1204
 
        # Assert the pre-condition
1205
 
        self.assertEqual(['a-1'], idx._history)
1206
 
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1207
 
 
1208
 
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1209
 
 
1210
 
        # And it shouldn't be modified
1211
 
        self.assertEqual(['a-1'], idx._history)
1212
 
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1213
 
 
1214
 
    def test_knit_index_ignores_empty_files(self):
1215
 
        # There was a race condition in older bzr, where a ^C at the right time
1216
 
        # could leave an empty .kndx file, which bzr would later claim was a
1217
 
        # corrupted file since the header was not present. In reality, the file
1218
 
        # just wasn't created, so it should be ignored.
1219
 
        t = get_transport('.')
1220
 
        t.put_bytes('test.kndx', '')
1221
 
 
1222
 
        knit = self.make_test_knit()
1223
 
 
1224
 
    def test_knit_index_checks_header(self):
1225
 
        t = get_transport('.')
1226
 
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
1227
 
 
1228
 
        self.assertRaises(KnitHeaderError, self.make_test_knit)