~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Robert Collins
  • Date: 2007-03-08 04:06:06 UTC
  • mfrom: (2323.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 2442.
  • Revision ID: robertc@robertcollins.net-20070308040606-84gsniv56huiyjt4
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for Knit data structure"""
18
18
 
19
 
 
 
19
from cStringIO import StringIO
20
20
import difflib
21
21
 
22
 
 
23
 
from bzrlib.errors import KnitError, RevisionAlreadyPresent
 
22
from bzrlib import (
 
23
    errors,
 
24
    )
 
25
from bzrlib.errors import (
 
26
    RevisionAlreadyPresent,
 
27
    KnitHeaderError,
 
28
    RevisionNotPresent,
 
29
    NoSuchFile,
 
30
    )
24
31
from bzrlib.knit import (
 
32
    KnitContent,
25
33
    KnitVersionedFile,
26
34
    KnitPlainFactory,
27
35
    KnitAnnotateFactory,
28
 
    WeaveToKnit)
 
36
    _KnitIndex,
 
37
    WeaveToKnit,
 
38
    )
29
39
from bzrlib.osutils import split_lines
30
 
from bzrlib.tests import TestCaseWithTransport
31
 
from bzrlib.transport import TransportLogger
32
 
from bzrlib.transport.local import LocalTransport
 
40
from bzrlib.tests import TestCase, TestCaseWithTransport
 
41
from bzrlib.transport import TransportLogger, get_transport
33
42
from bzrlib.transport.memory import MemoryTransport
34
43
from bzrlib.weave import Weave
35
44
 
36
45
 
 
46
class KnitContentTests(TestCase):
 
47
 
 
48
    def test_constructor(self):
 
49
        content = KnitContent([])
 
50
 
 
51
    def test_text(self):
 
52
        content = KnitContent([])
 
53
        self.assertEqual(content.text(), [])
 
54
 
 
55
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
56
        self.assertEqual(content.text(), ["text1", "text2"])
 
57
 
 
58
    def test_annotate(self):
 
59
        content = KnitContent([])
 
60
        self.assertEqual(content.annotate(), [])
 
61
 
 
62
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
63
        self.assertEqual(content.annotate(),
 
64
            [("origin1", "text1"), ("origin2", "text2")])
 
65
 
 
66
    def test_annotate_iter(self):
 
67
        content = KnitContent([])
 
68
        it = content.annotate_iter()
 
69
        self.assertRaises(StopIteration, it.next)
 
70
 
 
71
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
72
        it = content.annotate_iter()
 
73
        self.assertEqual(it.next(), ("origin1", "text1"))
 
74
        self.assertEqual(it.next(), ("origin2", "text2"))
 
75
        self.assertRaises(StopIteration, it.next)
 
76
 
 
77
    def test_copy(self):
 
78
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
79
        copy = content.copy()
 
80
        self.assertIsInstance(copy, KnitContent)
 
81
        self.assertEqual(copy.annotate(),
 
82
            [("origin1", "text1"), ("origin2", "text2")])
 
83
 
 
84
    def test_line_delta(self):
 
85
        content1 = KnitContent([("", "a"), ("", "b")])
 
86
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
87
        self.assertEqual(content1.line_delta(content2),
 
88
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
89
 
 
90
    def test_line_delta_iter(self):
 
91
        content1 = KnitContent([("", "a"), ("", "b")])
 
92
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
93
        it = content1.line_delta_iter(content2)
 
94
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
95
        self.assertRaises(StopIteration, it.next)
 
96
 
 
97
 
 
98
class MockTransport(object):
 
99
 
 
100
    def __init__(self, file_lines=None):
 
101
        self.file_lines = file_lines
 
102
        self.calls = []
 
103
        # We have no base directory for the MockTransport
 
104
        self.base = ''
 
105
 
 
106
    def get(self, filename):
 
107
        if self.file_lines is None:
 
108
            raise NoSuchFile(filename)
 
109
        else:
 
110
            return StringIO("\n".join(self.file_lines))
 
111
 
 
112
    def __getattr__(self, name):
 
113
        def queue_call(*args, **kwargs):
 
114
            self.calls.append((name, args, kwargs))
 
115
        return queue_call
 
116
 
 
117
 
 
118
class LowLevelKnitIndexTests(TestCase):
 
119
 
 
120
    def test_no_such_file(self):
 
121
        transport = MockTransport()
 
122
 
 
123
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
 
124
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
 
125
            "filename", "w", create=False)
 
126
 
 
127
    def test_create_file(self):
 
128
        transport = MockTransport()
 
129
 
 
130
        index = _KnitIndex(transport, "filename", "w",
 
131
            file_mode="wb", create=True)
 
132
        self.assertEqual(
 
133
                ("put_bytes_non_atomic",
 
134
                    ("filename", index.HEADER), {"mode": "wb"}),
 
135
                transport.calls.pop(0))
 
136
 
 
137
    def test_delay_create_file(self):
 
138
        transport = MockTransport()
 
139
 
 
140
        index = _KnitIndex(transport, "filename", "w",
 
141
            create=True, file_mode="wb", create_parent_dir=True,
 
142
            delay_create=True, dir_mode=0777)
 
143
        self.assertEqual([], transport.calls)
 
144
 
 
145
        index.add_versions([])
 
146
        name, (filename, f), kwargs = transport.calls.pop(0)
 
147
        self.assertEqual("put_file_non_atomic", name)
 
148
        self.assertEqual(
 
149
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
150
            kwargs)
 
151
        self.assertEqual("filename", filename)
 
152
        self.assertEqual(index.HEADER, f.read())
 
153
 
 
154
        index.add_versions([])
 
155
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
156
            transport.calls.pop(0))
 
157
 
 
158
    def test_read_utf8_version_id(self):
 
159
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
160
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
161
        transport = MockTransport([
 
162
            _KnitIndex.HEADER,
 
163
            '%s option 0 1 :' % (utf8_revision_id,)
 
164
            ])
 
165
        index = _KnitIndex(transport, "filename", "r")
 
166
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
167
        # Unicode revision_ids.
 
168
        self.assertTrue(index.has_version(utf8_revision_id))
 
169
        self.assertFalse(index.has_version(unicode_revision_id))
 
170
 
 
171
    def test_read_utf8_parents(self):
 
172
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
173
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
174
        transport = MockTransport([
 
175
            _KnitIndex.HEADER,
 
176
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
177
            ])
 
178
        index = _KnitIndex(transport, "filename", "r")
 
179
        self.assertEqual([utf8_revision_id],
 
180
            index.get_parents_with_ghosts("version"))
 
181
 
 
182
    def test_read_ignore_corrupted_lines(self):
 
183
        transport = MockTransport([
 
184
            _KnitIndex.HEADER,
 
185
            "corrupted",
 
186
            "corrupted options 0 1 .b .c ",
 
187
            "version options 0 1 :"
 
188
            ])
 
189
        index = _KnitIndex(transport, "filename", "r")
 
190
        self.assertEqual(1, index.num_versions())
 
191
        self.assertTrue(index.has_version("version"))
 
192
 
 
193
    def test_read_corrupted_header(self):
 
194
        transport = MockTransport(['not a bzr knit index header\n'])
 
195
        self.assertRaises(KnitHeaderError,
 
196
            _KnitIndex, transport, "filename", "r")
 
197
 
 
198
    def test_read_duplicate_entries(self):
 
199
        transport = MockTransport([
 
200
            _KnitIndex.HEADER,
 
201
            "parent options 0 1 :",
 
202
            "version options1 0 1 0 :",
 
203
            "version options2 1 2 .other :",
 
204
            "version options3 3 4 0 .other :"
 
205
            ])
 
206
        index = _KnitIndex(transport, "filename", "r")
 
207
        self.assertEqual(2, index.num_versions())
 
208
        self.assertEqual(1, index.lookup("version"))
 
209
        self.assertEqual((3, 4), index.get_position("version"))
 
210
        self.assertEqual(["options3"], index.get_options("version"))
 
211
        self.assertEqual(["parent", "other"],
 
212
            index.get_parents_with_ghosts("version"))
 
213
 
 
214
    def test_read_compressed_parents(self):
 
215
        transport = MockTransport([
 
216
            _KnitIndex.HEADER,
 
217
            "a option 0 1 :",
 
218
            "b option 0 1 0 :",
 
219
            "c option 0 1 1 0 :",
 
220
            ])
 
221
        index = _KnitIndex(transport, "filename", "r")
 
222
        self.assertEqual(["a"], index.get_parents("b"))
 
223
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
224
 
 
225
    def test_write_utf8_version_id(self):
 
226
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
227
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
228
        transport = MockTransport([
 
229
            _KnitIndex.HEADER
 
230
            ])
 
231
        index = _KnitIndex(transport, "filename", "r")
 
232
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
 
233
        self.assertEqual(("append_bytes", ("filename",
 
234
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
235
            {}),
 
236
            transport.calls.pop(0))
 
237
 
 
238
    def test_write_utf8_parents(self):
 
239
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
240
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
241
        transport = MockTransport([
 
242
            _KnitIndex.HEADER
 
243
            ])
 
244
        index = _KnitIndex(transport, "filename", "r")
 
245
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
 
246
        self.assertEqual(("append_bytes", ("filename",
 
247
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
248
            {}),
 
249
            transport.calls.pop(0))
 
250
 
 
251
    def test_get_graph(self):
 
252
        transport = MockTransport()
 
253
        index = _KnitIndex(transport, "filename", "w", create=True)
 
254
        self.assertEqual([], index.get_graph())
 
255
 
 
256
        index.add_version("a", ["option"], 0, 1, ["b"])
 
257
        self.assertEqual([("a", ["b"])], index.get_graph())
 
258
 
 
259
        index.add_version("c", ["option"], 0, 1, ["d"])
 
260
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
261
            sorted(index.get_graph()))
 
262
 
 
263
    def test_get_ancestry(self):
 
264
        transport = MockTransport([
 
265
            _KnitIndex.HEADER,
 
266
            "a option 0 1 :",
 
267
            "b option 0 1 0 .e :",
 
268
            "c option 0 1 1 0 :",
 
269
            "d option 0 1 2 .f :"
 
270
            ])
 
271
        index = _KnitIndex(transport, "filename", "r")
 
272
 
 
273
        self.assertEqual([], index.get_ancestry([]))
 
274
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
275
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
276
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
277
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
278
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
279
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
280
 
 
281
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
282
 
 
283
    def test_get_ancestry_with_ghosts(self):
 
284
        transport = MockTransport([
 
285
            _KnitIndex.HEADER,
 
286
            "a option 0 1 :",
 
287
            "b option 0 1 0 .e :",
 
288
            "c option 0 1 0 .f .g :",
 
289
            "d option 0 1 2 .h .j .k :"
 
290
            ])
 
291
        index = _KnitIndex(transport, "filename", "r")
 
292
 
 
293
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
294
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
295
        self.assertEqual(["a", "e", "b"],
 
296
            index.get_ancestry_with_ghosts(["b"]))
 
297
        self.assertEqual(["a", "g", "f", "c"],
 
298
            index.get_ancestry_with_ghosts(["c"]))
 
299
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
300
            index.get_ancestry_with_ghosts(["d"]))
 
301
        self.assertEqual(["a", "e", "b"],
 
302
            index.get_ancestry_with_ghosts(["a", "b"]))
 
303
        self.assertEqual(["a", "g", "f", "c"],
 
304
            index.get_ancestry_with_ghosts(["a", "c"]))
 
305
        self.assertEqual(
 
306
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
307
            index.get_ancestry_with_ghosts(["b", "d"]))
 
308
 
 
309
        self.assertRaises(RevisionNotPresent,
 
310
            index.get_ancestry_with_ghosts, ["e"])
 
311
 
 
312
    def test_num_versions(self):
 
313
        transport = MockTransport([
 
314
            _KnitIndex.HEADER
 
315
            ])
 
316
        index = _KnitIndex(transport, "filename", "r")
 
317
 
 
318
        self.assertEqual(0, index.num_versions())
 
319
        self.assertEqual(0, len(index))
 
320
 
 
321
        index.add_version("a", ["option"], 0, 1, [])
 
322
        self.assertEqual(1, index.num_versions())
 
323
        self.assertEqual(1, len(index))
 
324
 
 
325
        index.add_version("a", ["option2"], 1, 2, [])
 
326
        self.assertEqual(1, index.num_versions())
 
327
        self.assertEqual(1, len(index))
 
328
 
 
329
        index.add_version("b", ["option"], 0, 1, [])
 
330
        self.assertEqual(2, index.num_versions())
 
331
        self.assertEqual(2, len(index))
 
332
 
 
333
    def test_get_versions(self):
 
334
        transport = MockTransport([
 
335
            _KnitIndex.HEADER
 
336
            ])
 
337
        index = _KnitIndex(transport, "filename", "r")
 
338
 
 
339
        self.assertEqual([], index.get_versions())
 
340
 
 
341
        index.add_version("a", ["option"], 0, 1, [])
 
342
        self.assertEqual(["a"], index.get_versions())
 
343
 
 
344
        index.add_version("a", ["option"], 0, 1, [])
 
345
        self.assertEqual(["a"], index.get_versions())
 
346
 
 
347
        index.add_version("b", ["option"], 0, 1, [])
 
348
        self.assertEqual(["a", "b"], index.get_versions())
 
349
 
 
350
    def test_idx_to_name(self):
 
351
        transport = MockTransport([
 
352
            _KnitIndex.HEADER,
 
353
            "a option 0 1 :",
 
354
            "b option 0 1 :"
 
355
            ])
 
356
        index = _KnitIndex(transport, "filename", "r")
 
357
 
 
358
        self.assertEqual("a", index.idx_to_name(0))
 
359
        self.assertEqual("b", index.idx_to_name(1))
 
360
        self.assertEqual("b", index.idx_to_name(-1))
 
361
        self.assertEqual("a", index.idx_to_name(-2))
 
362
 
 
363
    def test_lookup(self):
 
364
        transport = MockTransport([
 
365
            _KnitIndex.HEADER,
 
366
            "a option 0 1 :",
 
367
            "b option 0 1 :"
 
368
            ])
 
369
        index = _KnitIndex(transport, "filename", "r")
 
370
 
 
371
        self.assertEqual(0, index.lookup("a"))
 
372
        self.assertEqual(1, index.lookup("b"))
 
373
 
 
374
    def test_add_version(self):
 
375
        transport = MockTransport([
 
376
            _KnitIndex.HEADER
 
377
            ])
 
378
        index = _KnitIndex(transport, "filename", "r")
 
379
 
 
380
        index.add_version("a", ["option"], 0, 1, ["b"])
 
381
        self.assertEqual(("append_bytes",
 
382
            ("filename", "\na option 0 1 .b :"),
 
383
            {}), transport.calls.pop(0))
 
384
        self.assertTrue(index.has_version("a"))
 
385
        self.assertEqual(1, index.num_versions())
 
386
        self.assertEqual((0, 1), index.get_position("a"))
 
387
        self.assertEqual(["option"], index.get_options("a"))
 
388
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
389
 
 
390
        index.add_version("a", ["opt"], 1, 2, ["c"])
 
391
        self.assertEqual(("append_bytes",
 
392
            ("filename", "\na opt 1 2 .c :"),
 
393
            {}), transport.calls.pop(0))
 
394
        self.assertTrue(index.has_version("a"))
 
395
        self.assertEqual(1, index.num_versions())
 
396
        self.assertEqual((1, 2), index.get_position("a"))
 
397
        self.assertEqual(["opt"], index.get_options("a"))
 
398
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
399
 
 
400
        index.add_version("b", ["option"], 2, 3, ["a"])
 
401
        self.assertEqual(("append_bytes",
 
402
            ("filename", "\nb option 2 3 0 :"),
 
403
            {}), transport.calls.pop(0))
 
404
        self.assertTrue(index.has_version("b"))
 
405
        self.assertEqual(2, index.num_versions())
 
406
        self.assertEqual((2, 3), index.get_position("b"))
 
407
        self.assertEqual(["option"], index.get_options("b"))
 
408
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
409
 
 
410
    def test_add_versions(self):
 
411
        transport = MockTransport([
 
412
            _KnitIndex.HEADER
 
413
            ])
 
414
        index = _KnitIndex(transport, "filename", "r")
 
415
 
 
416
        index.add_versions([
 
417
            ("a", ["option"], 0, 1, ["b"]),
 
418
            ("a", ["opt"], 1, 2, ["c"]),
 
419
            ("b", ["option"], 2, 3, ["a"])
 
420
            ])
 
421
        self.assertEqual(("append_bytes", ("filename",
 
422
            "\na option 0 1 .b :"
 
423
            "\na opt 1 2 .c :"
 
424
            "\nb option 2 3 0 :"
 
425
            ), {}), transport.calls.pop(0))
 
426
        self.assertTrue(index.has_version("a"))
 
427
        self.assertTrue(index.has_version("b"))
 
428
        self.assertEqual(2, index.num_versions())
 
429
        self.assertEqual((1, 2), index.get_position("a"))
 
430
        self.assertEqual((2, 3), index.get_position("b"))
 
431
        self.assertEqual(["opt"], index.get_options("a"))
 
432
        self.assertEqual(["option"], index.get_options("b"))
 
433
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
434
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
435
 
 
436
    def test_delay_create_and_add_versions(self):
 
437
        transport = MockTransport()
 
438
 
 
439
        index = _KnitIndex(transport, "filename", "w",
 
440
            create=True, file_mode="wb", create_parent_dir=True,
 
441
            delay_create=True, dir_mode=0777)
 
442
        self.assertEqual([], transport.calls)
 
443
 
 
444
        index.add_versions([
 
445
            ("a", ["option"], 0, 1, ["b"]),
 
446
            ("a", ["opt"], 1, 2, ["c"]),
 
447
            ("b", ["option"], 2, 3, ["a"])
 
448
            ])
 
449
        name, (filename, f), kwargs = transport.calls.pop(0)
 
450
        self.assertEqual("put_file_non_atomic", name)
 
451
        self.assertEqual(
 
452
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
453
            kwargs)
 
454
        self.assertEqual("filename", filename)
 
455
        self.assertEqual(
 
456
            index.HEADER +
 
457
            "\na option 0 1 .b :"
 
458
            "\na opt 1 2 .c :"
 
459
            "\nb option 2 3 0 :",
 
460
            f.read())
 
461
 
 
462
    def test_has_version(self):
 
463
        transport = MockTransport([
 
464
            _KnitIndex.HEADER,
 
465
            "a option 0 1 :"
 
466
            ])
 
467
        index = _KnitIndex(transport, "filename", "r")
 
468
 
 
469
        self.assertTrue(index.has_version("a"))
 
470
        self.assertFalse(index.has_version("b"))
 
471
 
 
472
    def test_get_position(self):
 
473
        transport = MockTransport([
 
474
            _KnitIndex.HEADER,
 
475
            "a option 0 1 :",
 
476
            "b option 1 2 :"
 
477
            ])
 
478
        index = _KnitIndex(transport, "filename", "r")
 
479
 
 
480
        self.assertEqual((0, 1), index.get_position("a"))
 
481
        self.assertEqual((1, 2), index.get_position("b"))
 
482
 
 
483
    def test_get_method(self):
 
484
        transport = MockTransport([
 
485
            _KnitIndex.HEADER,
 
486
            "a fulltext,unknown 0 1 :",
 
487
            "b unknown,line-delta 1 2 :",
 
488
            "c bad 3 4 :"
 
489
            ])
 
490
        index = _KnitIndex(transport, "filename", "r")
 
491
 
 
492
        self.assertEqual("fulltext", index.get_method("a"))
 
493
        self.assertEqual("line-delta", index.get_method("b"))
 
494
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
495
 
 
496
    def test_get_options(self):
 
497
        transport = MockTransport([
 
498
            _KnitIndex.HEADER,
 
499
            "a opt1 0 1 :",
 
500
            "b opt2,opt3 1 2 :"
 
501
            ])
 
502
        index = _KnitIndex(transport, "filename", "r")
 
503
 
 
504
        self.assertEqual(["opt1"], index.get_options("a"))
 
505
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
506
 
 
507
    def test_get_parents(self):
 
508
        transport = MockTransport([
 
509
            _KnitIndex.HEADER,
 
510
            "a option 0 1 :",
 
511
            "b option 1 2 0 .c :",
 
512
            "c option 1 2 1 0 .e :"
 
513
            ])
 
514
        index = _KnitIndex(transport, "filename", "r")
 
515
 
 
516
        self.assertEqual([], index.get_parents("a"))
 
517
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
518
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
519
 
 
520
    def test_get_parents_with_ghosts(self):
 
521
        transport = MockTransport([
 
522
            _KnitIndex.HEADER,
 
523
            "a option 0 1 :",
 
524
            "b option 1 2 0 .c :",
 
525
            "c option 1 2 1 0 .e :"
 
526
            ])
 
527
        index = _KnitIndex(transport, "filename", "r")
 
528
 
 
529
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
530
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
531
        self.assertEqual(["b", "a", "e"],
 
532
            index.get_parents_with_ghosts("c"))
 
533
 
 
534
    def test_check_versions_present(self):
 
535
        transport = MockTransport([
 
536
            _KnitIndex.HEADER,
 
537
            "a option 0 1 :",
 
538
            "b option 0 1 :"
 
539
            ])
 
540
        index = _KnitIndex(transport, "filename", "r")
 
541
 
 
542
        check = index.check_versions_present
 
543
 
 
544
        check([])
 
545
        check(["a"])
 
546
        check(["b"])
 
547
        check(["a", "b"])
 
548
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
549
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
550
 
 
551
 
37
552
class KnitTests(TestCaseWithTransport):
38
553
    """Class containing knit test helper routines."""
39
554
 
40
 
    def make_test_knit(self, annotate=False):
 
555
    def make_test_knit(self, annotate=False, delay_create=False):
41
556
        if not annotate:
42
557
            factory = KnitPlainFactory()
43
558
        else:
44
559
            factory = None
45
 
        return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
 
560
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
561
                                 factory=factory, create=True,
 
562
                                 delay_create=delay_create)
46
563
 
47
564
 
48
565
class BasicKnitTests(KnitTests):
67
584
        k = self.make_test_knit()
68
585
        k.add_lines('text-1', [], split_lines(TEXT_1))
69
586
        del k
70
 
        k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
587
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
71
588
        self.assertTrue(k2.has_version('text-1'))
72
589
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
73
590
 
95
612
    def test_incomplete(self):
96
613
        """Test if texts without a ending line-end can be inserted and
97
614
        extracted."""
98
 
        k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
 
615
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
99
616
        k.add_lines('text-1', [], ['a\n',    'b'  ])
100
617
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
101
618
        # reopening ensures maximum room for confusion
102
 
        k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
 
619
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
103
620
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
104
621
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
105
622
 
127
644
 
128
645
    def test_add_delta(self):
129
646
        """Store in knit with parents"""
130
 
        k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
 
647
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
131
648
            delta=True, create=True)
132
649
        self.add_stock_one_and_one_a(k)
133
650
        k.clear_cache()
135
652
 
136
653
    def test_annotate(self):
137
654
        """Annotations"""
138
 
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
 
655
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
139
656
            delta=True, create=True)
140
657
        self.insert_and_test_small_annotate(k)
141
658
 
150
667
 
151
668
    def test_annotate_fulltext(self):
152
669
        """Annotations"""
153
 
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
 
670
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
154
671
            delta=False, create=True)
155
672
        self.insert_and_test_small_annotate(k)
156
673
 
229
746
 
230
747
    def test_knit_join(self):
231
748
        """Store in knit with parents"""
232
 
        k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
 
749
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
233
750
        k1.add_lines('text-a', [], split_lines(TEXT_1))
234
751
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
235
752
 
238
755
 
239
756
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
240
757
 
241
 
        k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
 
758
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
242
759
        count = k2.join(k1, version_ids=['text-m'])
243
760
        self.assertEquals(count, 5)
244
761
        self.assertTrue(k2.has_version('text-a'))
245
762
        self.assertTrue(k2.has_version('text-c'))
246
763
 
247
764
    def test_reannotate(self):
248
 
        k1 = KnitVersionedFile('knit1', LocalTransport('.'),
 
765
        k1 = KnitVersionedFile('knit1', get_transport('.'),
249
766
                               factory=KnitAnnotateFactory(), create=True)
250
767
        # 0
251
768
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
252
769
        # 1
253
770
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
254
771
 
255
 
        k2 = KnitVersionedFile('test2', LocalTransport('.'),
 
772
        k2 = KnitVersionedFile('test2', get_transport('.'),
256
773
                               factory=KnitAnnotateFactory(), create=True)
257
774
        k2.join(k1, version_ids=['text-b'])
258
775
 
273
790
        self.assertEquals(origins[0], ('text-c', 'z\n'))
274
791
        self.assertEquals(origins[1], ('text-b', 'c\n'))
275
792
 
276
 
    def test_extraction_reads_components_once(self):
277
 
        t = MemoryTransport()
278
 
        instrumented_t = TransportLogger(t)
279
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
280
 
        # should read the index
281
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
282
 
        instrumented_t._calls = []
283
 
        # add a text       
284
 
        k1.add_lines('base', [], ['text\n'])
285
 
        # should not have read at all
286
 
        self.assertEqual([], instrumented_t._calls)
287
 
 
288
 
        # add a text
289
 
        k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
290
 
        # should not have read at all
291
 
        self.assertEqual([], instrumented_t._calls)
292
 
        
293
 
        # read a text
294
 
        k1.get_lines('sub')
295
 
        # should not have read at all
296
 
        self.assertEqual([], instrumented_t._calls)
297
 
 
298
 
        # clear the cache
299
 
        k1.clear_cache()
300
 
 
301
 
        # read a text
302
 
        k1.get_lines('base')
303
 
        # should have read a component
304
 
        # should not have read the first component only
305
 
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
306
 
        instrumented_t._calls = []
307
 
        # read again
308
 
        k1.get_lines('base')
309
 
        # should not have read at all
310
 
        self.assertEqual([], instrumented_t._calls)
311
 
        # and now read the other component
312
 
        k1.get_lines('sub')
313
 
        # should have read the second component
314
 
        self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
315
 
        instrumented_t._calls = []
316
 
 
317
 
        # clear the cache
318
 
        k1.clear_cache()
319
 
        # add a text cold 
320
 
        k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
321
 
        # should read the first component only
322
 
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
793
    def test_get_line_delta_texts(self):
 
794
        """Make sure we can call get_texts on text with reused line deltas"""
 
795
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
796
                               factory=KnitPlainFactory(), create=True)
 
797
        for t in range(3):
 
798
            if t == 0:
 
799
                parents = []
 
800
            else:
 
801
                parents = ['%d' % (t-1)]
 
802
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
803
        k1.get_texts(('%d' % t) for t in range(3))
323
804
        
324
805
    def test_iter_lines_reads_in_order(self):
325
806
        t = MemoryTransport()
349
830
        # this tests that a new knit index file has the expected content
350
831
        # and that is writes the data we expect as records are added.
351
832
        knit = self.make_test_knit(True)
 
833
        # Now knit files are not created until we first add data to them
352
834
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
353
835
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
354
836
        self.assertFileEqual(
363
845
            "\nrevid2 line-delta 84 82 0 :",
364
846
            'test.kndx')
365
847
        # we should be able to load this file again
366
 
        knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
 
848
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
367
849
        self.assertEqual(['revid', 'revid2'], knit.versions())
368
850
        # write a short write to the file and ensure that its ignored
369
851
        indexfile = file('test.kndx', 'at')
370
852
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
371
853
        indexfile.close()
372
854
        # we should be able to load this file again
373
 
        knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='w')
 
855
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
374
856
        self.assertEqual(['revid', 'revid2'], knit.versions())
375
857
        # and add a revision with the same id the failed write had
376
858
        knit.add_lines('revid3', ['revid2'], ['a\n'])
377
859
        # and when reading it revid3 should now appear.
378
 
        knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
 
860
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
379
861
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
380
862
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
381
863
 
 
864
    def test_delay_create(self):
 
865
        """Test that passing delay_create=True creates files late"""
 
866
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
867
        self.failIfExists('test.knit')
 
868
        self.failIfExists('test.kndx')
 
869
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
870
        self.failUnlessExists('test.knit')
 
871
        self.assertFileEqual(
 
872
            "# bzr knit index 8\n"
 
873
            "\n"
 
874
            "revid fulltext 0 84 .a_ghost :",
 
875
            'test.kndx')
 
876
 
 
877
    def test_create_parent_dir(self):
 
878
        """create_parent_dir can create knits in nonexistant dirs"""
 
879
        # Has no effect if we don't set 'delay_create'
 
880
        trans = get_transport('.')
 
881
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
882
                          trans, access_mode='w', factory=None,
 
883
                          create=True, create_parent_dir=True)
 
884
        # Nothing should have changed yet
 
885
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
886
                                 factory=None, create=True,
 
887
                                 create_parent_dir=True,
 
888
                                 delay_create=True)
 
889
        self.failIfExists('dir/test.knit')
 
890
        self.failIfExists('dir/test.kndx')
 
891
        self.failIfExists('dir')
 
892
        knit.add_lines('revid', [], ['a\n'])
 
893
        self.failUnlessExists('dir')
 
894
        self.failUnlessExists('dir/test.knit')
 
895
        self.assertFileEqual(
 
896
            "# bzr knit index 8\n"
 
897
            "\n"
 
898
            "revid fulltext 0 84  :",
 
899
            'dir/test.kndx')
 
900
 
 
901
    def test_create_mode_700(self):
 
902
        trans = get_transport('.')
 
903
        if not trans._can_roundtrip_unix_modebits():
 
904
            # Can't roundtrip, so no need to run this test
 
905
            return
 
906
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
907
                                 factory=None, create=True,
 
908
                                 create_parent_dir=True,
 
909
                                 delay_create=True,
 
910
                                 file_mode=0600,
 
911
                                 dir_mode=0700)
 
912
        knit.add_lines('revid', [], ['a\n'])
 
913
        self.assertTransportMode(trans, 'dir', 0700)
 
914
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
915
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
916
 
 
917
    def test_create_mode_770(self):
 
918
        trans = get_transport('.')
 
919
        if not trans._can_roundtrip_unix_modebits():
 
920
            # Can't roundtrip, so no need to run this test
 
921
            return
 
922
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
923
                                 factory=None, create=True,
 
924
                                 create_parent_dir=True,
 
925
                                 delay_create=True,
 
926
                                 file_mode=0660,
 
927
                                 dir_mode=0770)
 
928
        knit.add_lines('revid', [], ['a\n'])
 
929
        self.assertTransportMode(trans, 'dir', 0770)
 
930
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
931
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
932
 
 
933
    def test_create_mode_777(self):
 
934
        trans = get_transport('.')
 
935
        if not trans._can_roundtrip_unix_modebits():
 
936
            # Can't roundtrip, so no need to run this test
 
937
            return
 
938
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
939
                                 factory=None, create=True,
 
940
                                 create_parent_dir=True,
 
941
                                 delay_create=True,
 
942
                                 file_mode=0666,
 
943
                                 dir_mode=0777)
 
944
        knit.add_lines('revid', [], ['a\n'])
 
945
        self.assertTransportMode(trans, 'dir', 0777)
 
946
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
947
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
948
 
382
949
    def test_plan_merge(self):
383
950
        my_knit = self.make_test_knit(annotate=True)
384
951
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
487
1054
        self.failIf(WeaveToKnit.is_compatible(k, w))
488
1055
        self.failIf(WeaveToKnit.is_compatible(w, w))
489
1056
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
1057
 
 
1058
 
 
1059
class TestKnitCaching(KnitTests):
 
1060
    
 
1061
    def create_knit(self, cache_add=False):
 
1062
        k = self.make_test_knit(True)
 
1063
        if cache_add:
 
1064
            k.enable_cache()
 
1065
 
 
1066
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1067
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1068
        return k
 
1069
 
 
1070
    def test_no_caching(self):
 
1071
        k = self.create_knit()
 
1072
        # Nothing should be cached without setting 'enable_cache'
 
1073
        self.assertEqual({}, k._data._cache)
 
1074
 
 
1075
    def test_cache_add_and_clear(self):
 
1076
        k = self.create_knit(True)
 
1077
 
 
1078
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
1079
 
 
1080
        k.clear_cache()
 
1081
        self.assertEqual({}, k._data._cache)
 
1082
 
 
1083
    def test_cache_data_read_raw(self):
 
1084
        k = self.create_knit()
 
1085
 
 
1086
        # Now cache and read
 
1087
        k.enable_cache()
 
1088
 
 
1089
        def read_one_raw(version):
 
1090
            pos_map = k._get_components_positions([version])
 
1091
            method, pos, size, next = pos_map[version]
 
1092
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
1093
            self.assertEqual(1, len(lst))
 
1094
            return lst[0]
 
1095
 
 
1096
        val = read_one_raw('text-1')
 
1097
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
1098
 
 
1099
        k.clear_cache()
 
1100
        # After clear, new reads are not cached
 
1101
        self.assertEqual({}, k._data._cache)
 
1102
 
 
1103
        val2 = read_one_raw('text-1')
 
1104
        self.assertEqual(val, val2)
 
1105
        self.assertEqual({}, k._data._cache)
 
1106
 
 
1107
    def test_cache_data_read(self):
 
1108
        k = self.create_knit()
 
1109
 
 
1110
        def read_one(version):
 
1111
            pos_map = k._get_components_positions([version])
 
1112
            method, pos, size, next = pos_map[version]
 
1113
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
1114
            self.assertEqual(1, len(lst))
 
1115
            return lst[0]
 
1116
 
 
1117
        # Now cache and read
 
1118
        k.enable_cache()
 
1119
 
 
1120
        val = read_one('text-2')
 
1121
        self.assertEqual(['text-2'], k._data._cache.keys())
 
1122
        self.assertEqual('text-2', val[0])
 
1123
        content, digest = k._data._parse_record('text-2',
 
1124
                                                k._data._cache['text-2'])
 
1125
        self.assertEqual(content, val[1])
 
1126
        self.assertEqual(digest, val[2])
 
1127
 
 
1128
        k.clear_cache()
 
1129
        self.assertEqual({}, k._data._cache)
 
1130
 
 
1131
        val2 = read_one('text-2')
 
1132
        self.assertEqual(val, val2)
 
1133
        self.assertEqual({}, k._data._cache)
 
1134
 
 
1135
    def test_cache_read(self):
 
1136
        k = self.create_knit()
 
1137
        k.enable_cache()
 
1138
 
 
1139
        text = k.get_text('text-1')
 
1140
        self.assertEqual(TEXT_1, text)
 
1141
        self.assertEqual(['text-1'], k._data._cache.keys())
 
1142
 
 
1143
        k.clear_cache()
 
1144
        self.assertEqual({}, k._data._cache)
 
1145
 
 
1146
        text = k.get_text('text-1')
 
1147
        self.assertEqual(TEXT_1, text)
 
1148
        self.assertEqual({}, k._data._cache)
 
1149
 
 
1150
 
 
1151
class TestKnitIndex(KnitTests):
 
1152
 
 
1153
    def test_add_versions_dictionary_compresses(self):
 
1154
        """Adding versions to the index should update the lookup dict"""
 
1155
        knit = self.make_test_knit()
 
1156
        idx = knit._index
 
1157
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1158
        self.check_file_contents('test.kndx',
 
1159
            '# bzr knit index 8\n'
 
1160
            '\n'
 
1161
            'a-1 fulltext 0 0  :'
 
1162
            )
 
1163
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
1164
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
1165
                         ])
 
1166
        self.check_file_contents('test.kndx',
 
1167
            '# bzr knit index 8\n'
 
1168
            '\n'
 
1169
            'a-1 fulltext 0 0  :\n'
 
1170
            'a-2 fulltext 0 0 0 :\n'
 
1171
            'a-3 fulltext 0 0 1 :'
 
1172
            )
 
1173
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1174
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1175
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1176
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1177
                         }, idx._cache)
 
1178
 
 
1179
    def test_add_versions_fails_clean(self):
 
1180
        """If add_versions fails in the middle, it restores a pristine state.
 
1181
 
 
1182
        Any modifications that are made to the index are reset if all versions
 
1183
        cannot be added.
 
1184
        """
 
1185
        # This cheats a little bit by passing in a generator which will
 
1186
        # raise an exception before the processing finishes
 
1187
        # Other possibilities would be to have an version with the wrong number
 
1188
        # of entries, or to make the backing transport unable to write any
 
1189
        # files.
 
1190
 
 
1191
        knit = self.make_test_knit()
 
1192
        idx = knit._index
 
1193
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1194
 
 
1195
        class StopEarly(Exception):
 
1196
            pass
 
1197
 
 
1198
        def generate_failure():
 
1199
            """Add some entries and then raise an exception"""
 
1200
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
1201
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
1202
            raise StopEarly()
 
1203
 
 
1204
        # Assert the pre-condition
 
1205
        self.assertEqual(['a-1'], idx._history)
 
1206
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1207
 
 
1208
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
1209
 
 
1210
        # And it shouldn't be modified
 
1211
        self.assertEqual(['a-1'], idx._history)
 
1212
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1213
 
 
1214
    def test_knit_index_ignores_empty_files(self):
 
1215
        # There was a race condition in older bzr, where a ^C at the right time
 
1216
        # could leave an empty .kndx file, which bzr would later claim was a
 
1217
        # corrupted file since the header was not present. In reality, the file
 
1218
        # just wasn't created, so it should be ignored.
 
1219
        t = get_transport('.')
 
1220
        t.put_bytes('test.kndx', '')
 
1221
 
 
1222
        knit = self.make_test_knit()
 
1223
 
 
1224
    def test_knit_index_checks_header(self):
 
1225
        t = get_transport('.')
 
1226
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
1227
 
 
1228
        self.assertRaises(KnitHeaderError, self.make_test_knit)