~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Martin Pool
  • Date: 2005-06-27 08:18:07 UTC
  • mto: This revision was merged to the branch mainline in revision 852.
  • Revision ID: mbp@sourcefrog.net-20050627081807-dc3ff5726c88b247
More tests for insertion of lines in new versions.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for Knit data structure"""
18
 
 
19
 
from cStringIO import StringIO
20
 
import difflib
21
 
 
22
 
from bzrlib import (
23
 
    errors,
24
 
    )
25
 
from bzrlib.errors import (
26
 
    RevisionAlreadyPresent,
27
 
    KnitHeaderError,
28
 
    RevisionNotPresent,
29
 
    NoSuchFile,
30
 
    )
31
 
from bzrlib.knit import (
32
 
    KnitContent,
33
 
    KnitVersionedFile,
34
 
    KnitPlainFactory,
35
 
    KnitAnnotateFactory,
36
 
    _KnitIndex,
37
 
    WeaveToKnit,
38
 
    )
39
 
from bzrlib.osutils import split_lines
40
 
from bzrlib.tests import TestCase, TestCaseWithTransport
41
 
from bzrlib.transport import TransportLogger, get_transport
42
 
from bzrlib.transport.memory import MemoryTransport
43
 
from bzrlib.weave import Weave
44
 
 
45
 
 
46
 
class KnitContentTests(TestCase):
47
 
 
48
 
    def test_constructor(self):
49
 
        content = KnitContent([])
50
 
 
51
 
    def test_text(self):
52
 
        content = KnitContent([])
53
 
        self.assertEqual(content.text(), [])
54
 
 
55
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
56
 
        self.assertEqual(content.text(), ["text1", "text2"])
57
 
 
58
 
    def test_annotate(self):
59
 
        content = KnitContent([])
60
 
        self.assertEqual(content.annotate(), [])
61
 
 
62
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
63
 
        self.assertEqual(content.annotate(),
64
 
            [("origin1", "text1"), ("origin2", "text2")])
65
 
 
66
 
    def test_annotate_iter(self):
67
 
        content = KnitContent([])
68
 
        it = content.annotate_iter()
69
 
        self.assertRaises(StopIteration, it.next)
70
 
 
71
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
72
 
        it = content.annotate_iter()
73
 
        self.assertEqual(it.next(), ("origin1", "text1"))
74
 
        self.assertEqual(it.next(), ("origin2", "text2"))
75
 
        self.assertRaises(StopIteration, it.next)
76
 
 
77
 
    def test_copy(self):
78
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
79
 
        copy = content.copy()
80
 
        self.assertIsInstance(copy, KnitContent)
81
 
        self.assertEqual(copy.annotate(),
82
 
            [("origin1", "text1"), ("origin2", "text2")])
83
 
 
84
 
    def test_line_delta(self):
85
 
        content1 = KnitContent([("", "a"), ("", "b")])
86
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
87
 
        self.assertEqual(content1.line_delta(content2),
88
 
            [(1, 2, 2, [("", "a"), ("", "c")])])
89
 
 
90
 
    def test_line_delta_iter(self):
91
 
        content1 = KnitContent([("", "a"), ("", "b")])
92
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
93
 
        it = content1.line_delta_iter(content2)
94
 
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
95
 
        self.assertRaises(StopIteration, it.next)
96
 
 
97
 
 
98
 
class MockTransport(object):
99
 
 
100
 
    def __init__(self, file_lines=None):
101
 
        self.file_lines = file_lines
102
 
        self.calls = []
103
 
        # We have no base directory for the MockTransport
104
 
        self.base = ''
105
 
 
106
 
    def get(self, filename):
107
 
        if self.file_lines is None:
108
 
            raise NoSuchFile(filename)
109
 
        else:
110
 
            return StringIO("\n".join(self.file_lines))
111
 
 
112
 
    def __getattr__(self, name):
113
 
        def queue_call(*args, **kwargs):
114
 
            self.calls.append((name, args, kwargs))
115
 
        return queue_call
116
 
 
117
 
 
118
 
class LowLevelKnitIndexTests(TestCase):
119
 
 
120
 
    def test_no_such_file(self):
121
 
        transport = MockTransport()
122
 
 
123
 
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
124
 
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
125
 
            "filename", "w", create=False)
126
 
 
127
 
    def test_create_file(self):
128
 
        transport = MockTransport()
129
 
 
130
 
        index = _KnitIndex(transport, "filename", "w",
131
 
            file_mode="wb", create=True)
132
 
        self.assertEqual(
133
 
                ("put_bytes_non_atomic",
134
 
                    ("filename", index.HEADER), {"mode": "wb"}),
135
 
                transport.calls.pop(0))
136
 
 
137
 
    def test_delay_create_file(self):
138
 
        transport = MockTransport()
139
 
 
140
 
        index = _KnitIndex(transport, "filename", "w",
141
 
            create=True, file_mode="wb", create_parent_dir=True,
142
 
            delay_create=True, dir_mode=0777)
143
 
        self.assertEqual([], transport.calls)
144
 
 
145
 
        index.add_versions([])
146
 
        name, (filename, f), kwargs = transport.calls.pop(0)
147
 
        self.assertEqual("put_file_non_atomic", name)
148
 
        self.assertEqual(
149
 
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
150
 
            kwargs)
151
 
        self.assertEqual("filename", filename)
152
 
        self.assertEqual(index.HEADER, f.read())
153
 
 
154
 
        index.add_versions([])
155
 
        self.assertEqual(("append_bytes", ("filename", ""), {}),
156
 
            transport.calls.pop(0))
157
 
 
158
 
    def test_read_utf8_version_id(self):
159
 
        transport = MockTransport([
160
 
            _KnitIndex.HEADER,
161
 
            u"version-\N{CYRILLIC CAPITAL LETTER A}"
162
 
                u" option 0 1 :".encode("utf-8")
163
 
            ])
164
 
        index = _KnitIndex(transport, "filename", "r")
165
 
        self.assertTrue(
166
 
            index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
167
 
 
168
 
    def test_read_utf8_parents(self):
169
 
        transport = MockTransport([
170
 
            _KnitIndex.HEADER,
171
 
            u"version option 0 1"
172
 
                u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
173
 
            ])
174
 
        index = _KnitIndex(transport, "filename", "r")
175
 
        self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
176
 
            index.get_parents_with_ghosts("version"))
177
 
 
178
 
    def test_read_ignore_corrupted_lines(self):
179
 
        transport = MockTransport([
180
 
            _KnitIndex.HEADER,
181
 
            "corrupted",
182
 
            "corrupted options 0 1 .b .c ",
183
 
            "version options 0 1 :"
184
 
            ])
185
 
        index = _KnitIndex(transport, "filename", "r")
186
 
        self.assertEqual(1, index.num_versions())
187
 
        self.assertTrue(index.has_version(u"version"))
188
 
 
189
 
    def test_read_corrupted_header(self):
190
 
        transport = MockTransport(['not a bzr knit index header\n'])
191
 
        self.assertRaises(KnitHeaderError,
192
 
            _KnitIndex, transport, "filename", "r")
193
 
 
194
 
    def test_read_duplicate_entries(self):
195
 
        transport = MockTransport([
196
 
            _KnitIndex.HEADER,
197
 
            "parent options 0 1 :",
198
 
            "version options1 0 1 0 :",
199
 
            "version options2 1 2 .other :",
200
 
            "version options3 3 4 0 .other :"
201
 
            ])
202
 
        index = _KnitIndex(transport, "filename", "r")
203
 
        self.assertEqual(2, index.num_versions())
204
 
        self.assertEqual(1, index.lookup(u"version"))
205
 
        self.assertEqual((3, 4), index.get_position(u"version"))
206
 
        self.assertEqual(["options3"], index.get_options(u"version"))
207
 
        self.assertEqual([u"parent", u"other"],
208
 
            index.get_parents_with_ghosts(u"version"))
209
 
 
210
 
    def test_read_compressed_parents(self):
211
 
        transport = MockTransport([
212
 
            _KnitIndex.HEADER,
213
 
            "a option 0 1 :",
214
 
            "b option 0 1 0 :",
215
 
            "c option 0 1 1 0 :",
216
 
            ])
217
 
        index = _KnitIndex(transport, "filename", "r")
218
 
        self.assertEqual([u"a"], index.get_parents(u"b"))
219
 
        self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
220
 
 
221
 
    def test_write_utf8_version_id(self):
222
 
        transport = MockTransport([
223
 
            _KnitIndex.HEADER
224
 
            ])
225
 
        index = _KnitIndex(transport, "filename", "r")
226
 
        index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
227
 
            ["option"], 0, 1, [])
228
 
        self.assertEqual(("append_bytes", ("filename",
229
 
            u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
230
 
                u" option 0 1  :".encode("utf-8")),
231
 
            {}),
232
 
            transport.calls.pop(0))
233
 
 
234
 
    def test_write_utf8_parents(self):
235
 
        transport = MockTransport([
236
 
            _KnitIndex.HEADER
237
 
            ])
238
 
        index = _KnitIndex(transport, "filename", "r")
239
 
        index.add_version(u"version", ["option"], 0, 1,
240
 
            [u"version-\N{CYRILLIC CAPITAL LETTER A}"])
241
 
        self.assertEqual(("append_bytes", ("filename",
242
 
            u"\nversion option 0 1"
243
 
                u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
244
 
            {}),
245
 
            transport.calls.pop(0))
246
 
 
247
 
    def test_get_graph(self):
248
 
        transport = MockTransport()
249
 
        index = _KnitIndex(transport, "filename", "w", create=True)
250
 
        self.assertEqual([], index.get_graph())
251
 
 
252
 
        index.add_version(u"a", ["option"], 0, 1, [u"b"])
253
 
        self.assertEqual([(u"a", [u"b"])], index.get_graph())
254
 
 
255
 
        index.add_version(u"c", ["option"], 0, 1, [u"d"])
256
 
        self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
257
 
            sorted(index.get_graph()))
258
 
 
259
 
    def test_get_ancestry(self):
260
 
        transport = MockTransport([
261
 
            _KnitIndex.HEADER,
262
 
            "a option 0 1 :",
263
 
            "b option 0 1 0 .e :",
264
 
            "c option 0 1 1 0 :",
265
 
            "d option 0 1 2 .f :"
266
 
            ])
267
 
        index = _KnitIndex(transport, "filename", "r")
268
 
 
269
 
        self.assertEqual([], index.get_ancestry([]))
270
 
        self.assertEqual([u"a"], index.get_ancestry([u"a"]))
271
 
        self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
272
 
        self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
273
 
        self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
274
 
        self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
275
 
        self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
276
 
 
277
 
        self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
278
 
 
279
 
    def test_get_ancestry_with_ghosts(self):
280
 
        transport = MockTransport([
281
 
            _KnitIndex.HEADER,
282
 
            "a option 0 1 :",
283
 
            "b option 0 1 0 .e :",
284
 
            "c option 0 1 0 .f .g :",
285
 
            "d option 0 1 2 .h .j .k :"
286
 
            ])
287
 
        index = _KnitIndex(transport, "filename", "r")
288
 
 
289
 
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
290
 
        self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
291
 
        self.assertEqual([u"a", u"e", u"b"],
292
 
            index.get_ancestry_with_ghosts([u"b"]))
293
 
        self.assertEqual([u"a", u"g", u"f", u"c"],
294
 
            index.get_ancestry_with_ghosts([u"c"]))
295
 
        self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
296
 
            index.get_ancestry_with_ghosts([u"d"]))
297
 
        self.assertEqual([u"a", u"e", u"b"],
298
 
            index.get_ancestry_with_ghosts([u"a", u"b"]))
299
 
        self.assertEqual([u"a", u"g", u"f", u"c"],
300
 
            index.get_ancestry_with_ghosts([u"a", u"c"]))
301
 
        self.assertEqual(
302
 
            [u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
303
 
            index.get_ancestry_with_ghosts([u"b", u"d"]))
304
 
 
305
 
        self.assertRaises(RevisionNotPresent,
306
 
            index.get_ancestry_with_ghosts, [u"e"])
307
 
 
308
 
    def test_num_versions(self):
309
 
        transport = MockTransport([
310
 
            _KnitIndex.HEADER
311
 
            ])
312
 
        index = _KnitIndex(transport, "filename", "r")
313
 
 
314
 
        self.assertEqual(0, index.num_versions())
315
 
        self.assertEqual(0, len(index))
316
 
 
317
 
        index.add_version(u"a", ["option"], 0, 1, [])
318
 
        self.assertEqual(1, index.num_versions())
319
 
        self.assertEqual(1, len(index))
320
 
 
321
 
        index.add_version(u"a", ["option2"], 1, 2, [])
322
 
        self.assertEqual(1, index.num_versions())
323
 
        self.assertEqual(1, len(index))
324
 
 
325
 
        index.add_version(u"b", ["option"], 0, 1, [])
326
 
        self.assertEqual(2, index.num_versions())
327
 
        self.assertEqual(2, len(index))
328
 
 
329
 
    def test_get_versions(self):
330
 
        transport = MockTransport([
331
 
            _KnitIndex.HEADER
332
 
            ])
333
 
        index = _KnitIndex(transport, "filename", "r")
334
 
 
335
 
        self.assertEqual([], index.get_versions())
336
 
 
337
 
        index.add_version(u"a", ["option"], 0, 1, [])
338
 
        self.assertEqual([u"a"], index.get_versions())
339
 
 
340
 
        index.add_version(u"a", ["option"], 0, 1, [])
341
 
        self.assertEqual([u"a"], index.get_versions())
342
 
 
343
 
        index.add_version(u"b", ["option"], 0, 1, [])
344
 
        self.assertEqual([u"a", u"b"], index.get_versions())
345
 
 
346
 
    def test_idx_to_name(self):
347
 
        transport = MockTransport([
348
 
            _KnitIndex.HEADER,
349
 
            "a option 0 1 :",
350
 
            "b option 0 1 :"
351
 
            ])
352
 
        index = _KnitIndex(transport, "filename", "r")
353
 
 
354
 
        self.assertEqual(u"a", index.idx_to_name(0))
355
 
        self.assertEqual(u"b", index.idx_to_name(1))
356
 
        self.assertEqual(u"b", index.idx_to_name(-1))
357
 
        self.assertEqual(u"a", index.idx_to_name(-2))
358
 
 
359
 
    def test_lookup(self):
360
 
        transport = MockTransport([
361
 
            _KnitIndex.HEADER,
362
 
            "a option 0 1 :",
363
 
            "b option 0 1 :"
364
 
            ])
365
 
        index = _KnitIndex(transport, "filename", "r")
366
 
 
367
 
        self.assertEqual(0, index.lookup(u"a"))
368
 
        self.assertEqual(1, index.lookup(u"b"))
369
 
 
370
 
    def test_add_version(self):
371
 
        transport = MockTransport([
372
 
            _KnitIndex.HEADER
373
 
            ])
374
 
        index = _KnitIndex(transport, "filename", "r")
375
 
 
376
 
        index.add_version(u"a", ["option"], 0, 1, [u"b"])
377
 
        self.assertEqual(("append_bytes",
378
 
            ("filename", "\na option 0 1 .b :"),
379
 
            {}), transport.calls.pop(0))
380
 
        self.assertTrue(index.has_version(u"a"))
381
 
        self.assertEqual(1, index.num_versions())
382
 
        self.assertEqual((0, 1), index.get_position(u"a"))
383
 
        self.assertEqual(["option"], index.get_options(u"a"))
384
 
        self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
385
 
 
386
 
        index.add_version(u"a", ["opt"], 1, 2, [u"c"])
387
 
        self.assertEqual(("append_bytes",
388
 
            ("filename", "\na opt 1 2 .c :"),
389
 
            {}), transport.calls.pop(0))
390
 
        self.assertTrue(index.has_version(u"a"))
391
 
        self.assertEqual(1, index.num_versions())
392
 
        self.assertEqual((1, 2), index.get_position(u"a"))
393
 
        self.assertEqual(["opt"], index.get_options(u"a"))
394
 
        self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
395
 
 
396
 
        index.add_version(u"b", ["option"], 2, 3, [u"a"])
397
 
        self.assertEqual(("append_bytes",
398
 
            ("filename", "\nb option 2 3 0 :"),
399
 
            {}), transport.calls.pop(0))
400
 
        self.assertTrue(index.has_version(u"b"))
401
 
        self.assertEqual(2, index.num_versions())
402
 
        self.assertEqual((2, 3), index.get_position(u"b"))
403
 
        self.assertEqual(["option"], index.get_options(u"b"))
404
 
        self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
405
 
 
406
 
    def test_add_versions(self):
407
 
        transport = MockTransport([
408
 
            _KnitIndex.HEADER
409
 
            ])
410
 
        index = _KnitIndex(transport, "filename", "r")
411
 
 
412
 
        index.add_versions([
413
 
            (u"a", ["option"], 0, 1, [u"b"]),
414
 
            (u"a", ["opt"], 1, 2, [u"c"]),
415
 
            (u"b", ["option"], 2, 3, [u"a"])
416
 
            ])
417
 
        self.assertEqual(("append_bytes", ("filename",
418
 
            "\na option 0 1 .b :"
419
 
            "\na opt 1 2 .c :"
420
 
            "\nb option 2 3 0 :"
421
 
            ), {}), transport.calls.pop(0))
422
 
        self.assertTrue(index.has_version(u"a"))
423
 
        self.assertTrue(index.has_version(u"b"))
424
 
        self.assertEqual(2, index.num_versions())
425
 
        self.assertEqual((1, 2), index.get_position(u"a"))
426
 
        self.assertEqual((2, 3), index.get_position(u"b"))
427
 
        self.assertEqual(["opt"], index.get_options(u"a"))
428
 
        self.assertEqual(["option"], index.get_options(u"b"))
429
 
        self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
430
 
        self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
431
 
 
432
 
    def test_delay_create_and_add_versions(self):
433
 
        transport = MockTransport()
434
 
 
435
 
        index = _KnitIndex(transport, "filename", "w",
436
 
            create=True, file_mode="wb", create_parent_dir=True,
437
 
            delay_create=True, dir_mode=0777)
438
 
        self.assertEqual([], transport.calls)
439
 
 
440
 
        index.add_versions([
441
 
            (u"a", ["option"], 0, 1, [u"b"]),
442
 
            (u"a", ["opt"], 1, 2, [u"c"]),
443
 
            (u"b", ["option"], 2, 3, [u"a"])
444
 
            ])
445
 
        name, (filename, f), kwargs = transport.calls.pop(0)
446
 
        self.assertEqual("put_file_non_atomic", name)
447
 
        self.assertEqual(
448
 
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
449
 
            kwargs)
450
 
        self.assertEqual("filename", filename)
451
 
        self.assertEqual(
452
 
            index.HEADER +
453
 
            "\na option 0 1 .b :"
454
 
            "\na opt 1 2 .c :"
455
 
            "\nb option 2 3 0 :",
456
 
            f.read())
457
 
 
458
 
    def test_has_version(self):
459
 
        transport = MockTransport([
460
 
            _KnitIndex.HEADER,
461
 
            "a option 0 1 :"
462
 
            ])
463
 
        index = _KnitIndex(transport, "filename", "r")
464
 
 
465
 
        self.assertTrue(index.has_version(u"a"))
466
 
        self.assertFalse(index.has_version(u"b"))
467
 
 
468
 
    def test_get_position(self):
469
 
        transport = MockTransport([
470
 
            _KnitIndex.HEADER,
471
 
            "a option 0 1 :",
472
 
            "b option 1 2 :"
473
 
            ])
474
 
        index = _KnitIndex(transport, "filename", "r")
475
 
 
476
 
        self.assertEqual((0, 1), index.get_position(u"a"))
477
 
        self.assertEqual((1, 2), index.get_position(u"b"))
478
 
 
479
 
    def test_get_method(self):
480
 
        transport = MockTransport([
481
 
            _KnitIndex.HEADER,
482
 
            "a fulltext,unknown 0 1 :",
483
 
            "b unknown,line-delta 1 2 :",
484
 
            "c bad 3 4 :"
485
 
            ])
486
 
        index = _KnitIndex(transport, "filename", "r")
487
 
 
488
 
        self.assertEqual("fulltext", index.get_method(u"a"))
489
 
        self.assertEqual("line-delta", index.get_method(u"b"))
490
 
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, u"c")
491
 
 
492
 
    def test_get_options(self):
493
 
        transport = MockTransport([
494
 
            _KnitIndex.HEADER,
495
 
            "a opt1 0 1 :",
496
 
            "b opt2,opt3 1 2 :"
497
 
            ])
498
 
        index = _KnitIndex(transport, "filename", "r")
499
 
 
500
 
        self.assertEqual(["opt1"], index.get_options(u"a"))
501
 
        self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
502
 
 
503
 
    def test_get_parents(self):
504
 
        transport = MockTransport([
505
 
            _KnitIndex.HEADER,
506
 
            "a option 0 1 :",
507
 
            "b option 1 2 0 .c :",
508
 
            "c option 1 2 1 0 .e :"
509
 
            ])
510
 
        index = _KnitIndex(transport, "filename", "r")
511
 
 
512
 
        self.assertEqual([], index.get_parents(u"a"))
513
 
        self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
514
 
        self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
515
 
 
516
 
    def test_get_parents_with_ghosts(self):
517
 
        transport = MockTransport([
518
 
            _KnitIndex.HEADER,
519
 
            "a option 0 1 :",
520
 
            "b option 1 2 0 .c :",
521
 
            "c option 1 2 1 0 .e :"
522
 
            ])
523
 
        index = _KnitIndex(transport, "filename", "r")
524
 
 
525
 
        self.assertEqual([], index.get_parents_with_ghosts(u"a"))
526
 
        self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
527
 
        self.assertEqual([u"b", u"a", u"e"],
528
 
            index.get_parents_with_ghosts(u"c"))
529
 
 
530
 
    def test_check_versions_present(self):
531
 
        transport = MockTransport([
532
 
            _KnitIndex.HEADER,
533
 
            "a option 0 1 :",
534
 
            "b option 0 1 :"
535
 
            ])
536
 
        index = _KnitIndex(transport, "filename", "r")
537
 
 
538
 
        check = index.check_versions_present
539
 
 
540
 
        check([])
541
 
        check([u"a"])
542
 
        check([u"b"])
543
 
        check([u"a", u"b"])
544
 
        self.assertRaises(RevisionNotPresent, check, [u"c"])
545
 
        self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
546
 
 
547
 
 
548
 
class KnitTests(TestCaseWithTransport):
549
 
    """Class containing knit test helper routines."""
550
 
 
551
 
    def make_test_knit(self, annotate=False, delay_create=False):
552
 
        if not annotate:
553
 
            factory = KnitPlainFactory()
554
 
        else:
555
 
            factory = None
556
 
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
557
 
                                 factory=factory, create=True,
558
 
                                 delay_create=delay_create)
559
 
 
560
 
 
561
 
class BasicKnitTests(KnitTests):
562
 
 
563
 
    def add_stock_one_and_one_a(self, k):
564
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
565
 
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
566
 
 
567
 
    def test_knit_constructor(self):
568
 
        """Construct empty k"""
569
 
        self.make_test_knit()
570
 
 
571
 
    def test_knit_add(self):
572
 
        """Store one text in knit and retrieve"""
573
 
        k = self.make_test_knit()
574
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
575
 
        self.assertTrue(k.has_version('text-1'))
576
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
577
 
 
578
 
    def test_knit_reload(self):
579
 
        # test that the content in a reloaded knit is correct
580
 
        k = self.make_test_knit()
581
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
582
 
        del k
583
 
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
584
 
        self.assertTrue(k2.has_version('text-1'))
585
 
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
586
 
 
587
 
    def test_knit_several(self):
588
 
        """Store several texts in a knit"""
589
 
        k = self.make_test_knit()
590
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
591
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
592
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
593
 
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
594
 
        
595
 
    def test_repeated_add(self):
596
 
        """Knit traps attempt to replace existing version"""
597
 
        k = self.make_test_knit()
598
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
599
 
        self.assertRaises(RevisionAlreadyPresent, 
600
 
                k.add_lines,
601
 
                'text-1', [], split_lines(TEXT_1))
602
 
 
603
 
    def test_empty(self):
604
 
        k = self.make_test_knit(True)
605
 
        k.add_lines('text-1', [], [])
606
 
        self.assertEquals(k.get_lines('text-1'), [])
607
 
 
608
 
    def test_incomplete(self):
609
 
        """Test if texts without a ending line-end can be inserted and
610
 
        extracted."""
611
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
612
 
        k.add_lines('text-1', [], ['a\n',    'b'  ])
613
 
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
614
 
        # reopening ensures maximum room for confusion
615
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
616
 
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
617
 
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
618
 
 
619
 
    def test_delta(self):
620
 
        """Expression of knit delta as lines"""
621
 
        k = self.make_test_knit()
622
 
        td = list(line_delta(TEXT_1.splitlines(True),
623
 
                             TEXT_1A.splitlines(True)))
624
 
        self.assertEqualDiff(''.join(td), delta_1_1a)
625
 
        out = apply_line_delta(TEXT_1.splitlines(True), td)
626
 
        self.assertEqualDiff(''.join(out), TEXT_1A)
627
 
 
628
 
    def test_add_with_parents(self):
629
 
        """Store in knit with parents"""
630
 
        k = self.make_test_knit()
631
 
        self.add_stock_one_and_one_a(k)
632
 
        self.assertEquals(k.get_parents('text-1'), [])
633
 
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
634
 
 
635
 
    def test_ancestry(self):
636
 
        """Store in knit with parents"""
637
 
        k = self.make_test_knit()
638
 
        self.add_stock_one_and_one_a(k)
639
 
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
640
 
 
641
 
    def test_add_delta(self):
642
 
        """Store in knit with parents"""
643
 
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
644
 
            delta=True, create=True)
645
 
        self.add_stock_one_and_one_a(k)
646
 
        k.clear_cache()
647
 
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
648
 
 
649
 
    def test_annotate(self):
650
 
        """Annotations"""
651
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
652
 
            delta=True, create=True)
653
 
        self.insert_and_test_small_annotate(k)
654
 
 
655
 
    def insert_and_test_small_annotate(self, k):
656
 
        """test annotation with k works correctly."""
657
 
        k.add_lines('text-1', [], ['a\n', 'b\n'])
658
 
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
659
 
 
660
 
        origins = k.annotate('text-2')
661
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
662
 
        self.assertEquals(origins[1], ('text-2', 'c\n'))
663
 
 
664
 
    def test_annotate_fulltext(self):
665
 
        """Annotations"""
666
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
667
 
            delta=False, create=True)
668
 
        self.insert_and_test_small_annotate(k)
669
 
 
670
 
    def test_annotate_merge_1(self):
671
 
        k = self.make_test_knit(True)
672
 
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
673
 
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
674
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
675
 
        origins = k.annotate('text-am')
676
 
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
677
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
678
 
 
679
 
    def test_annotate_merge_2(self):
680
 
        k = self.make_test_knit(True)
681
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
682
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
683
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
684
 
        origins = k.annotate('text-am')
685
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
686
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
687
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
688
 
 
689
 
    def test_annotate_merge_9(self):
690
 
        k = self.make_test_knit(True)
691
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
692
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
693
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
694
 
        origins = k.annotate('text-am')
695
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
696
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
697
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
698
 
 
699
 
    def test_annotate_merge_3(self):
700
 
        k = self.make_test_knit(True)
701
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
702
 
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
703
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
704
 
        origins = k.annotate('text-am')
705
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
706
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
707
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
708
 
 
709
 
    def test_annotate_merge_4(self):
710
 
        k = self.make_test_knit(True)
711
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
712
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
713
 
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
714
 
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
715
 
        origins = k.annotate('text-am')
716
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
717
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
718
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
719
 
 
720
 
    def test_annotate_merge_5(self):
721
 
        k = self.make_test_knit(True)
722
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
723
 
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
724
 
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
725
 
        k.add_lines('text-am',
726
 
                    ['text-a1', 'text-a2', 'text-a3'],
727
 
                    ['a\n', 'e\n', 'z\n'])
728
 
        origins = k.annotate('text-am')
729
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
730
 
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
731
 
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
732
 
 
733
 
    def test_annotate_file_cherry_pick(self):
734
 
        k = self.make_test_knit(True)
735
 
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
736
 
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
737
 
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
738
 
        origins = k.annotate('text-3')
739
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
740
 
        self.assertEquals(origins[1], ('text-1', 'b\n'))
741
 
        self.assertEquals(origins[2], ('text-1', 'c\n'))
742
 
 
743
 
    def test_knit_join(self):
744
 
        """Store in knit with parents"""
745
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
746
 
        k1.add_lines('text-a', [], split_lines(TEXT_1))
747
 
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
748
 
 
749
 
        k1.add_lines('text-c', [], split_lines(TEXT_1))
750
 
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
751
 
 
752
 
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
753
 
 
754
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
755
 
        count = k2.join(k1, version_ids=['text-m'])
756
 
        self.assertEquals(count, 5)
757
 
        self.assertTrue(k2.has_version('text-a'))
758
 
        self.assertTrue(k2.has_version('text-c'))
759
 
 
760
 
    def test_reannotate(self):
761
 
        k1 = KnitVersionedFile('knit1', get_transport('.'),
762
 
                               factory=KnitAnnotateFactory(), create=True)
763
 
        # 0
764
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
765
 
        # 1
766
 
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
767
 
 
768
 
        k2 = KnitVersionedFile('test2', get_transport('.'),
769
 
                               factory=KnitAnnotateFactory(), create=True)
770
 
        k2.join(k1, version_ids=['text-b'])
771
 
 
772
 
        # 2
773
 
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
774
 
        # 2
775
 
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
776
 
        # 3
777
 
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
778
 
 
779
 
        # test-c will have index 3
780
 
        k1.join(k2, version_ids=['text-c'])
781
 
 
782
 
        lines = k1.get_lines('text-c')
783
 
        self.assertEquals(lines, ['z\n', 'c\n'])
784
 
 
785
 
        origins = k1.annotate('text-c')
786
 
        self.assertEquals(origins[0], ('text-c', 'z\n'))
787
 
        self.assertEquals(origins[1], ('text-b', 'c\n'))
788
 
 
789
 
    def test_get_line_delta_texts(self):
790
 
        """Make sure we can call get_texts on text with reused line deltas"""
791
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
792
 
                               factory=KnitPlainFactory(), create=True)
793
 
        for t in range(3):
794
 
            if t == 0:
795
 
                parents = []
796
 
            else:
797
 
                parents = ['%d' % (t-1)]
798
 
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
799
 
        k1.get_texts(('%d' % t) for t in range(3))
800
 
        
801
 
    def test_iter_lines_reads_in_order(self):
802
 
        t = MemoryTransport()
803
 
        instrumented_t = TransportLogger(t)
804
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
805
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
806
 
        # add texts with no required ordering
807
 
        k1.add_lines('base', [], ['text\n'])
808
 
        k1.add_lines('base2', [], ['text2\n'])
809
 
        k1.clear_cache()
810
 
        instrumented_t._calls = []
811
 
        # request a last-first iteration
812
 
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
813
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
814
 
        self.assertEqual(['text\n', 'text2\n'], results)
815
 
 
816
 
    def test_create_empty_annotated(self):
817
 
        k1 = self.make_test_knit(True)
818
 
        # 0
819
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
820
 
        k2 = k1.create_empty('t', MemoryTransport())
821
 
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
822
 
        self.assertEqual(k1.delta, k2.delta)
823
 
        # the generic test checks for empty content and file class
824
 
 
825
 
    def test_knit_format(self):
826
 
        # this tests that a new knit index file has the expected content
827
 
        # and that is writes the data we expect as records are added.
828
 
        knit = self.make_test_knit(True)
829
 
        # Now knit files are not created until we first add data to them
830
 
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
831
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
832
 
        self.assertFileEqual(
833
 
            "# bzr knit index 8\n"
834
 
            "\n"
835
 
            "revid fulltext 0 84 .a_ghost :",
836
 
            'test.kndx')
837
 
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
838
 
        self.assertFileEqual(
839
 
            "# bzr knit index 8\n"
840
 
            "\nrevid fulltext 0 84 .a_ghost :"
841
 
            "\nrevid2 line-delta 84 82 0 :",
842
 
            'test.kndx')
843
 
        # we should be able to load this file again
844
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
845
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
846
 
        # write a short write to the file and ensure that its ignored
847
 
        indexfile = file('test.kndx', 'at')
848
 
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
849
 
        indexfile.close()
850
 
        # we should be able to load this file again
851
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
852
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
853
 
        # and add a revision with the same id the failed write had
854
 
        knit.add_lines('revid3', ['revid2'], ['a\n'])
855
 
        # and when reading it revid3 should now appear.
856
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
857
 
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
858
 
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
859
 
 
860
 
    def test_delay_create(self):
861
 
        """Test that passing delay_create=True creates files late"""
862
 
        knit = self.make_test_knit(annotate=True, delay_create=True)
863
 
        self.failIfExists('test.knit')
864
 
        self.failIfExists('test.kndx')
865
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
866
 
        self.failUnlessExists('test.knit')
867
 
        self.assertFileEqual(
868
 
            "# bzr knit index 8\n"
869
 
            "\n"
870
 
            "revid fulltext 0 84 .a_ghost :",
871
 
            'test.kndx')
872
 
 
873
 
    def test_create_parent_dir(self):
874
 
        """create_parent_dir can create knits in nonexistant dirs"""
875
 
        # Has no effect if we don't set 'delay_create'
876
 
        trans = get_transport('.')
877
 
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
878
 
                          trans, access_mode='w', factory=None,
879
 
                          create=True, create_parent_dir=True)
880
 
        # Nothing should have changed yet
881
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
882
 
                                 factory=None, create=True,
883
 
                                 create_parent_dir=True,
884
 
                                 delay_create=True)
885
 
        self.failIfExists('dir/test.knit')
886
 
        self.failIfExists('dir/test.kndx')
887
 
        self.failIfExists('dir')
888
 
        knit.add_lines('revid', [], ['a\n'])
889
 
        self.failUnlessExists('dir')
890
 
        self.failUnlessExists('dir/test.knit')
891
 
        self.assertFileEqual(
892
 
            "# bzr knit index 8\n"
893
 
            "\n"
894
 
            "revid fulltext 0 84  :",
895
 
            'dir/test.kndx')
896
 
 
897
 
    def test_create_mode_700(self):
898
 
        trans = get_transport('.')
899
 
        if not trans._can_roundtrip_unix_modebits():
900
 
            # Can't roundtrip, so no need to run this test
901
 
            return
902
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
903
 
                                 factory=None, create=True,
904
 
                                 create_parent_dir=True,
905
 
                                 delay_create=True,
906
 
                                 file_mode=0600,
907
 
                                 dir_mode=0700)
908
 
        knit.add_lines('revid', [], ['a\n'])
909
 
        self.assertTransportMode(trans, 'dir', 0700)
910
 
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
911
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
912
 
 
913
 
    def test_create_mode_770(self):
914
 
        trans = get_transport('.')
915
 
        if not trans._can_roundtrip_unix_modebits():
916
 
            # Can't roundtrip, so no need to run this test
917
 
            return
918
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
919
 
                                 factory=None, create=True,
920
 
                                 create_parent_dir=True,
921
 
                                 delay_create=True,
922
 
                                 file_mode=0660,
923
 
                                 dir_mode=0770)
924
 
        knit.add_lines('revid', [], ['a\n'])
925
 
        self.assertTransportMode(trans, 'dir', 0770)
926
 
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
927
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
928
 
 
929
 
    def test_create_mode_777(self):
930
 
        trans = get_transport('.')
931
 
        if not trans._can_roundtrip_unix_modebits():
932
 
            # Can't roundtrip, so no need to run this test
933
 
            return
934
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
935
 
                                 factory=None, create=True,
936
 
                                 create_parent_dir=True,
937
 
                                 delay_create=True,
938
 
                                 file_mode=0666,
939
 
                                 dir_mode=0777)
940
 
        knit.add_lines('revid', [], ['a\n'])
941
 
        self.assertTransportMode(trans, 'dir', 0777)
942
 
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
943
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
944
 
 
945
 
    def test_plan_merge(self):
946
 
        my_knit = self.make_test_knit(annotate=True)
947
 
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
948
 
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
949
 
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
950
 
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
951
 
        for plan_line, expected_line in zip(plan, AB_MERGE):
952
 
            self.assertEqual(plan_line, expected_line)
953
 
 
954
 
 
955
 
TEXT_1 = """\
956
 
Banana cup cakes:
957
 
 
958
 
- bananas
959
 
- eggs
960
 
- broken tea cups
961
 
"""
962
 
 
963
 
TEXT_1A = """\
964
 
Banana cup cake recipe
965
 
(serves 6)
966
 
 
967
 
- bananas
968
 
- eggs
969
 
- broken tea cups
970
 
- self-raising flour
971
 
"""
972
 
 
973
 
TEXT_1B = """\
974
 
Banana cup cake recipe
975
 
 
976
 
- bananas (do not use plantains!!!)
977
 
- broken tea cups
978
 
- flour
979
 
"""
980
 
 
981
 
delta_1_1a = """\
982
 
0,1,2
983
 
Banana cup cake recipe
984
 
(serves 6)
985
 
5,5,1
986
 
- self-raising flour
987
 
"""
988
 
 
989
 
TEXT_2 = """\
990
 
Boeuf bourguignon
991
 
 
992
 
- beef
993
 
- red wine
994
 
- small onions
995
 
- carrot
996
 
- mushrooms
997
 
"""
998
 
 
999
 
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1000
 
new-a|(serves 6)
1001
 
unchanged|
1002
 
killed-b|- bananas
1003
 
killed-b|- eggs
1004
 
new-b|- bananas (do not use plantains!!!)
1005
 
unchanged|- broken tea cups
1006
 
new-a|- self-raising flour
1007
 
new-b|- flour
1008
 
"""
1009
 
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1010
 
 
1011
 
 
1012
 
def line_delta(from_lines, to_lines):
1013
 
    """Generate line-based delta from one text to another"""
1014
 
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
1015
 
    for op in s.get_opcodes():
1016
 
        if op[0] == 'equal':
1017
 
            continue
1018
 
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1019
 
        for i in range(op[3], op[4]):
1020
 
            yield to_lines[i]
1021
 
 
1022
 
 
1023
 
def apply_line_delta(basis_lines, delta_lines):
1024
 
    """Apply a line-based perfect diff
1025
 
    
1026
 
    basis_lines -- text to apply the patch to
1027
 
    delta_lines -- diff instructions and content
1028
 
    """
1029
 
    out = basis_lines[:]
1030
 
    i = 0
1031
 
    offset = 0
1032
 
    while i < len(delta_lines):
1033
 
        l = delta_lines[i]
1034
 
        a, b, c = map(long, l.split(','))
1035
 
        i = i + 1
1036
 
        out[offset+a:offset+b] = delta_lines[i:i+c]
1037
 
        i = i + c
1038
 
        offset = offset + (b - a) + c
1039
 
    return out
1040
 
 
1041
 
 
1042
 
class TestWeaveToKnit(KnitTests):
1043
 
 
1044
 
    def test_weave_to_knit_matches(self):
1045
 
        # check that the WeaveToKnit is_compatible function
1046
 
        # registers True for a Weave to a Knit.
1047
 
        w = Weave()
1048
 
        k = self.make_test_knit()
1049
 
        self.failUnless(WeaveToKnit.is_compatible(w, k))
1050
 
        self.failIf(WeaveToKnit.is_compatible(k, w))
1051
 
        self.failIf(WeaveToKnit.is_compatible(w, w))
1052
 
        self.failIf(WeaveToKnit.is_compatible(k, k))
1053
 
 
1054
 
 
1055
 
class TestKnitCaching(KnitTests):
1056
 
    
1057
 
    def create_knit(self, cache_add=False):
1058
 
        k = self.make_test_knit(True)
1059
 
        if cache_add:
1060
 
            k.enable_cache()
1061
 
 
1062
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
1063
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
1064
 
        return k
1065
 
 
1066
 
    def test_no_caching(self):
1067
 
        k = self.create_knit()
1068
 
        # Nothing should be cached without setting 'enable_cache'
1069
 
        self.assertEqual({}, k._data._cache)
1070
 
 
1071
 
    def test_cache_add_and_clear(self):
1072
 
        k = self.create_knit(True)
1073
 
 
1074
 
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1075
 
 
1076
 
        k.clear_cache()
1077
 
        self.assertEqual({}, k._data._cache)
1078
 
 
1079
 
    def test_cache_data_read_raw(self):
1080
 
        k = self.create_knit()
1081
 
 
1082
 
        # Now cache and read
1083
 
        k.enable_cache()
1084
 
 
1085
 
        def read_one_raw(version):
1086
 
            pos_map = k._get_components_positions([version])
1087
 
            method, pos, size, next = pos_map[version]
1088
 
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1089
 
            self.assertEqual(1, len(lst))
1090
 
            return lst[0]
1091
 
 
1092
 
        val = read_one_raw('text-1')
1093
 
        self.assertEqual({'text-1':val[1]}, k._data._cache)
1094
 
 
1095
 
        k.clear_cache()
1096
 
        # After clear, new reads are not cached
1097
 
        self.assertEqual({}, k._data._cache)
1098
 
 
1099
 
        val2 = read_one_raw('text-1')
1100
 
        self.assertEqual(val, val2)
1101
 
        self.assertEqual({}, k._data._cache)
1102
 
 
1103
 
    def test_cache_data_read(self):
1104
 
        k = self.create_knit()
1105
 
 
1106
 
        def read_one(version):
1107
 
            pos_map = k._get_components_positions([version])
1108
 
            method, pos, size, next = pos_map[version]
1109
 
            lst = list(k._data.read_records_iter([(version, pos, size)]))
1110
 
            self.assertEqual(1, len(lst))
1111
 
            return lst[0]
1112
 
 
1113
 
        # Now cache and read
1114
 
        k.enable_cache()
1115
 
 
1116
 
        val = read_one('text-2')
1117
 
        self.assertEqual(['text-2'], k._data._cache.keys())
1118
 
        self.assertEqual('text-2', val[0])
1119
 
        content, digest = k._data._parse_record('text-2',
1120
 
                                                k._data._cache['text-2'])
1121
 
        self.assertEqual(content, val[1])
1122
 
        self.assertEqual(digest, val[2])
1123
 
 
1124
 
        k.clear_cache()
1125
 
        self.assertEqual({}, k._data._cache)
1126
 
 
1127
 
        val2 = read_one('text-2')
1128
 
        self.assertEqual(val, val2)
1129
 
        self.assertEqual({}, k._data._cache)
1130
 
 
1131
 
    def test_cache_read(self):
1132
 
        k = self.create_knit()
1133
 
        k.enable_cache()
1134
 
 
1135
 
        text = k.get_text('text-1')
1136
 
        self.assertEqual(TEXT_1, text)
1137
 
        self.assertEqual(['text-1'], k._data._cache.keys())
1138
 
 
1139
 
        k.clear_cache()
1140
 
        self.assertEqual({}, k._data._cache)
1141
 
 
1142
 
        text = k.get_text('text-1')
1143
 
        self.assertEqual(TEXT_1, text)
1144
 
        self.assertEqual({}, k._data._cache)
1145
 
 
1146
 
 
1147
 
class TestKnitIndex(KnitTests):
1148
 
 
1149
 
    def test_add_versions_dictionary_compresses(self):
1150
 
        """Adding versions to the index should update the lookup dict"""
1151
 
        knit = self.make_test_knit()
1152
 
        idx = knit._index
1153
 
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
1154
 
        self.check_file_contents('test.kndx',
1155
 
            '# bzr knit index 8\n'
1156
 
            '\n'
1157
 
            'a-1 fulltext 0 0  :'
1158
 
            )
1159
 
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1160
 
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
1161
 
                         ])
1162
 
        self.check_file_contents('test.kndx',
1163
 
            '# bzr knit index 8\n'
1164
 
            '\n'
1165
 
            'a-1 fulltext 0 0  :\n'
1166
 
            'a-2 fulltext 0 0 0 :\n'
1167
 
            'a-3 fulltext 0 0 1 :'
1168
 
            )
1169
 
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1170
 
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1171
 
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1172
 
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1173
 
                         }, idx._cache)
1174
 
 
1175
 
    def test_add_versions_fails_clean(self):
1176
 
        """If add_versions fails in the middle, it restores a pristine state.
1177
 
 
1178
 
        Any modifications that are made to the index are reset if all versions
1179
 
        cannot be added.
1180
 
        """
1181
 
        # This cheats a little bit by passing in a generator which will
1182
 
        # raise an exception before the processing finishes
1183
 
        # Other possibilities would be to have an version with the wrong number
1184
 
        # of entries, or to make the backing transport unable to write any
1185
 
        # files.
1186
 
 
1187
 
        knit = self.make_test_knit()
1188
 
        idx = knit._index
1189
 
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
1190
 
 
1191
 
        class StopEarly(Exception):
1192
 
            pass
1193
 
 
1194
 
        def generate_failure():
1195
 
            """Add some entries and then raise an exception"""
1196
 
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1197
 
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1198
 
            raise StopEarly()
1199
 
 
1200
 
        # Assert the pre-condition
1201
 
        self.assertEqual(['a-1'], idx._history)
1202
 
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1203
 
 
1204
 
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1205
 
 
1206
 
        # And it shouldn't be modified
1207
 
        self.assertEqual(['a-1'], idx._history)
1208
 
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1209
 
 
1210
 
    def test_knit_index_ignores_empty_files(self):
1211
 
        # There was a race condition in older bzr, where a ^C at the right time
1212
 
        # could leave an empty .kndx file, which bzr would later claim was a
1213
 
        # corrupted file since the header was not present. In reality, the file
1214
 
        # just wasn't created, so it should be ignored.
1215
 
        t = get_transport('.')
1216
 
        t.put_bytes('test.kndx', '')
1217
 
 
1218
 
        knit = self.make_test_knit()
1219
 
 
1220
 
    def test_knit_index_checks_header(self):
1221
 
        t = get_transport('.')
1222
 
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
1223
 
 
1224
 
        self.assertRaises(KnitHeaderError, self.make_test_knit)