~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Aaron Bentley
  • Date: 2007-06-22 22:19:13 UTC
  • mto: (2520.5.2 bzr.mpbundle)
  • mto: This revision was merged to the branch mainline in revision 2631.
  • Revision ID: abentley@panoramicfeedback.com-20070622221913-mcjioqruw8rhgnd8
Improve locking in _BaseMergeDirective.from_object

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
from cStringIO import StringIO
 
20
import difflib
 
21
import gzip
 
22
import sha
 
23
 
 
24
from bzrlib import (
 
25
    errors,
 
26
    )
 
27
from bzrlib.errors import (
 
28
    RevisionAlreadyPresent,
 
29
    KnitHeaderError,
 
30
    RevisionNotPresent,
 
31
    NoSuchFile,
 
32
    )
 
33
from bzrlib.knit import (
 
34
    KnitContent,
 
35
    KnitVersionedFile,
 
36
    KnitPlainFactory,
 
37
    KnitAnnotateFactory,
 
38
    _KnitData,
 
39
    _KnitIndex,
 
40
    WeaveToKnit,
 
41
    KnitSequenceMatcher,
 
42
    )
 
43
from bzrlib.osutils import split_lines
 
44
from bzrlib.tests import TestCase, TestCaseWithTransport
 
45
from bzrlib.transport import TransportLogger, get_transport
 
46
from bzrlib.transport.memory import MemoryTransport
 
47
from bzrlib.weave import Weave
 
48
 
 
49
 
 
50
class KnitContentTests(TestCase):
 
51
 
 
52
    def test_constructor(self):
 
53
        content = KnitContent([])
 
54
 
 
55
    def test_text(self):
 
56
        content = KnitContent([])
 
57
        self.assertEqual(content.text(), [])
 
58
 
 
59
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
60
        self.assertEqual(content.text(), ["text1", "text2"])
 
61
 
 
62
    def test_annotate(self):
 
63
        content = KnitContent([])
 
64
        self.assertEqual(content.annotate(), [])
 
65
 
 
66
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
67
        self.assertEqual(content.annotate(),
 
68
            [("origin1", "text1"), ("origin2", "text2")])
 
69
 
 
70
    def test_annotate_iter(self):
 
71
        content = KnitContent([])
 
72
        it = content.annotate_iter()
 
73
        self.assertRaises(StopIteration, it.next)
 
74
 
 
75
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
76
        it = content.annotate_iter()
 
77
        self.assertEqual(it.next(), ("origin1", "text1"))
 
78
        self.assertEqual(it.next(), ("origin2", "text2"))
 
79
        self.assertRaises(StopIteration, it.next)
 
80
 
 
81
    def test_copy(self):
 
82
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
83
        copy = content.copy()
 
84
        self.assertIsInstance(copy, KnitContent)
 
85
        self.assertEqual(copy.annotate(),
 
86
            [("origin1", "text1"), ("origin2", "text2")])
 
87
 
 
88
    def test_line_delta(self):
 
89
        content1 = KnitContent([("", "a"), ("", "b")])
 
90
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
91
        self.assertEqual(content1.line_delta(content2),
 
92
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
93
 
 
94
    def test_line_delta_iter(self):
 
95
        content1 = KnitContent([("", "a"), ("", "b")])
 
96
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
97
        it = content1.line_delta_iter(content2)
 
98
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
99
        self.assertRaises(StopIteration, it.next)
 
100
 
 
101
 
 
102
class MockTransport(object):
 
103
 
 
104
    def __init__(self, file_lines=None):
 
105
        self.file_lines = file_lines
 
106
        self.calls = []
 
107
        # We have no base directory for the MockTransport
 
108
        self.base = ''
 
109
 
 
110
    def get(self, filename):
 
111
        if self.file_lines is None:
 
112
            raise NoSuchFile(filename)
 
113
        else:
 
114
            return StringIO("\n".join(self.file_lines))
 
115
 
 
116
    def readv(self, relpath, offsets):
 
117
        fp = self.get(relpath)
 
118
        for offset, size in offsets:
 
119
            fp.seek(offset)
 
120
            yield offset, fp.read(size)
 
121
 
 
122
    def __getattr__(self, name):
 
123
        def queue_call(*args, **kwargs):
 
124
            self.calls.append((name, args, kwargs))
 
125
        return queue_call
 
126
 
 
127
 
 
128
class LowLevelKnitDataTests(TestCase):
 
129
 
 
130
    def create_gz_content(self, text):
 
131
        sio = StringIO()
 
132
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
133
        gz_file.write(text)
 
134
        gz_file.close()
 
135
        return sio.getvalue()
 
136
 
 
137
    def test_valid_knit_data(self):
 
138
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
139
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
140
                                        'foo\n'
 
141
                                        'bar\n'
 
142
                                        'end rev-id-1\n'
 
143
                                        % (sha1sum,))
 
144
        transport = MockTransport([gz_txt])
 
145
        data = _KnitData(transport, 'filename', mode='r')
 
146
        records = [('rev-id-1', 0, len(gz_txt))]
 
147
 
 
148
        contents = data.read_records(records)
 
149
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
150
 
 
151
        raw_contents = list(data.read_records_iter_raw(records))
 
152
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
153
 
 
154
    def test_not_enough_lines(self):
 
155
        sha1sum = sha.new('foo\n').hexdigest()
 
156
        # record says 2 lines data says 1
 
157
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
158
                                        'foo\n'
 
159
                                        'end rev-id-1\n'
 
160
                                        % (sha1sum,))
 
161
        transport = MockTransport([gz_txt])
 
162
        data = _KnitData(transport, 'filename', mode='r')
 
163
        records = [('rev-id-1', 0, len(gz_txt))]
 
164
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
165
 
 
166
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
167
        raw_contents = list(data.read_records_iter_raw(records))
 
168
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
169
 
 
170
    def test_too_many_lines(self):
 
171
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
172
        # record says 1 lines data says 2
 
173
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
174
                                        'foo\n'
 
175
                                        'bar\n'
 
176
                                        'end rev-id-1\n'
 
177
                                        % (sha1sum,))
 
178
        transport = MockTransport([gz_txt])
 
179
        data = _KnitData(transport, 'filename', mode='r')
 
180
        records = [('rev-id-1', 0, len(gz_txt))]
 
181
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
182
 
 
183
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
184
        raw_contents = list(data.read_records_iter_raw(records))
 
185
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
186
 
 
187
    def test_mismatched_version_id(self):
 
188
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
189
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
190
                                        'foo\n'
 
191
                                        'bar\n'
 
192
                                        'end rev-id-1\n'
 
193
                                        % (sha1sum,))
 
194
        transport = MockTransport([gz_txt])
 
195
        data = _KnitData(transport, 'filename', mode='r')
 
196
        # We are asking for rev-id-2, but the data is rev-id-1
 
197
        records = [('rev-id-2', 0, len(gz_txt))]
 
198
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
199
 
 
200
        # read_records_iter_raw will notice if we request the wrong version.
 
201
        self.assertRaises(errors.KnitCorrupt, list,
 
202
                          data.read_records_iter_raw(records))
 
203
 
 
204
    def test_uncompressed_data(self):
 
205
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
206
        txt = ('version rev-id-1 2 %s\n'
 
207
               'foo\n'
 
208
               'bar\n'
 
209
               'end rev-id-1\n'
 
210
               % (sha1sum,))
 
211
        transport = MockTransport([txt])
 
212
        data = _KnitData(transport, 'filename', mode='r')
 
213
        records = [('rev-id-1', 0, len(txt))]
 
214
 
 
215
        # We don't have valid gzip data ==> corrupt
 
216
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
217
 
 
218
        # read_records_iter_raw will notice the bad data
 
219
        self.assertRaises(errors.KnitCorrupt, list,
 
220
                          data.read_records_iter_raw(records))
 
221
 
 
222
    def test_corrupted_data(self):
 
223
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
224
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
225
                                        'foo\n'
 
226
                                        'bar\n'
 
227
                                        'end rev-id-1\n'
 
228
                                        % (sha1sum,))
 
229
        # Change 2 bytes in the middle to \xff
 
230
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
231
        transport = MockTransport([gz_txt])
 
232
        data = _KnitData(transport, 'filename', mode='r')
 
233
        records = [('rev-id-1', 0, len(gz_txt))]
 
234
 
 
235
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
236
 
 
237
        # read_records_iter_raw will notice if we request the wrong version.
 
238
        self.assertRaises(errors.KnitCorrupt, list,
 
239
                          data.read_records_iter_raw(records))
 
240
 
 
241
 
 
242
class LowLevelKnitIndexTests(TestCase):
 
243
 
 
244
    def test_no_such_file(self):
 
245
        transport = MockTransport()
 
246
 
 
247
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
 
248
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
 
249
            "filename", "w", create=False)
 
250
 
 
251
    def test_create_file(self):
 
252
        transport = MockTransport()
 
253
 
 
254
        index = _KnitIndex(transport, "filename", "w",
 
255
            file_mode="wb", create=True)
 
256
        self.assertEqual(
 
257
                ("put_bytes_non_atomic",
 
258
                    ("filename", index.HEADER), {"mode": "wb"}),
 
259
                transport.calls.pop(0))
 
260
 
 
261
    def test_delay_create_file(self):
 
262
        transport = MockTransport()
 
263
 
 
264
        index = _KnitIndex(transport, "filename", "w",
 
265
            create=True, file_mode="wb", create_parent_dir=True,
 
266
            delay_create=True, dir_mode=0777)
 
267
        self.assertEqual([], transport.calls)
 
268
 
 
269
        index.add_versions([])
 
270
        name, (filename, f), kwargs = transport.calls.pop(0)
 
271
        self.assertEqual("put_file_non_atomic", name)
 
272
        self.assertEqual(
 
273
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
274
            kwargs)
 
275
        self.assertEqual("filename", filename)
 
276
        self.assertEqual(index.HEADER, f.read())
 
277
 
 
278
        index.add_versions([])
 
279
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
280
            transport.calls.pop(0))
 
281
 
 
282
    def test_read_utf8_version_id(self):
 
283
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
284
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
285
        transport = MockTransport([
 
286
            _KnitIndex.HEADER,
 
287
            '%s option 0 1 :' % (utf8_revision_id,)
 
288
            ])
 
289
        index = _KnitIndex(transport, "filename", "r")
 
290
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
291
        # Unicode revision_ids.
 
292
        self.assertTrue(index.has_version(utf8_revision_id))
 
293
        self.assertFalse(index.has_version(unicode_revision_id))
 
294
 
 
295
    def test_read_utf8_parents(self):
 
296
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
297
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
298
        transport = MockTransport([
 
299
            _KnitIndex.HEADER,
 
300
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
301
            ])
 
302
        index = _KnitIndex(transport, "filename", "r")
 
303
        self.assertEqual([utf8_revision_id],
 
304
            index.get_parents_with_ghosts("version"))
 
305
 
 
306
    def test_read_ignore_corrupted_lines(self):
 
307
        transport = MockTransport([
 
308
            _KnitIndex.HEADER,
 
309
            "corrupted",
 
310
            "corrupted options 0 1 .b .c ",
 
311
            "version options 0 1 :"
 
312
            ])
 
313
        index = _KnitIndex(transport, "filename", "r")
 
314
        self.assertEqual(1, index.num_versions())
 
315
        self.assertTrue(index.has_version("version"))
 
316
 
 
317
    def test_read_corrupted_header(self):
 
318
        transport = MockTransport(['not a bzr knit index header\n'])
 
319
        self.assertRaises(KnitHeaderError,
 
320
            _KnitIndex, transport, "filename", "r")
 
321
 
 
322
    def test_read_duplicate_entries(self):
 
323
        transport = MockTransport([
 
324
            _KnitIndex.HEADER,
 
325
            "parent options 0 1 :",
 
326
            "version options1 0 1 0 :",
 
327
            "version options2 1 2 .other :",
 
328
            "version options3 3 4 0 .other :"
 
329
            ])
 
330
        index = _KnitIndex(transport, "filename", "r")
 
331
        self.assertEqual(2, index.num_versions())
 
332
        self.assertEqual(1, index.lookup("version"))
 
333
        self.assertEqual((3, 4), index.get_position("version"))
 
334
        self.assertEqual(["options3"], index.get_options("version"))
 
335
        self.assertEqual(["parent", "other"],
 
336
            index.get_parents_with_ghosts("version"))
 
337
 
 
338
    def test_read_compressed_parents(self):
 
339
        transport = MockTransport([
 
340
            _KnitIndex.HEADER,
 
341
            "a option 0 1 :",
 
342
            "b option 0 1 0 :",
 
343
            "c option 0 1 1 0 :",
 
344
            ])
 
345
        index = _KnitIndex(transport, "filename", "r")
 
346
        self.assertEqual(["a"], index.get_parents("b"))
 
347
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
348
 
 
349
    def test_write_utf8_version_id(self):
 
350
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
351
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
352
        transport = MockTransport([
 
353
            _KnitIndex.HEADER
 
354
            ])
 
355
        index = _KnitIndex(transport, "filename", "r")
 
356
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
 
357
        self.assertEqual(("append_bytes", ("filename",
 
358
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
359
            {}),
 
360
            transport.calls.pop(0))
 
361
 
 
362
    def test_write_utf8_parents(self):
 
363
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
364
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
365
        transport = MockTransport([
 
366
            _KnitIndex.HEADER
 
367
            ])
 
368
        index = _KnitIndex(transport, "filename", "r")
 
369
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
 
370
        self.assertEqual(("append_bytes", ("filename",
 
371
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
372
            {}),
 
373
            transport.calls.pop(0))
 
374
 
 
375
    def test_get_graph(self):
 
376
        transport = MockTransport()
 
377
        index = _KnitIndex(transport, "filename", "w", create=True)
 
378
        self.assertEqual([], index.get_graph())
 
379
 
 
380
        index.add_version("a", ["option"], 0, 1, ["b"])
 
381
        self.assertEqual([("a", ["b"])], index.get_graph())
 
382
 
 
383
        index.add_version("c", ["option"], 0, 1, ["d"])
 
384
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
385
            sorted(index.get_graph()))
 
386
 
 
387
    def test_get_ancestry(self):
 
388
        transport = MockTransport([
 
389
            _KnitIndex.HEADER,
 
390
            "a option 0 1 :",
 
391
            "b option 0 1 0 .e :",
 
392
            "c option 0 1 1 0 :",
 
393
            "d option 0 1 2 .f :"
 
394
            ])
 
395
        index = _KnitIndex(transport, "filename", "r")
 
396
 
 
397
        self.assertEqual([], index.get_ancestry([]))
 
398
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
399
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
400
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
401
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
402
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
403
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
404
 
 
405
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
406
 
 
407
    def test_get_ancestry_with_ghosts(self):
 
408
        transport = MockTransport([
 
409
            _KnitIndex.HEADER,
 
410
            "a option 0 1 :",
 
411
            "b option 0 1 0 .e :",
 
412
            "c option 0 1 0 .f .g :",
 
413
            "d option 0 1 2 .h .j .k :"
 
414
            ])
 
415
        index = _KnitIndex(transport, "filename", "r")
 
416
 
 
417
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
418
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
419
        self.assertEqual(["a", "e", "b"],
 
420
            index.get_ancestry_with_ghosts(["b"]))
 
421
        self.assertEqual(["a", "g", "f", "c"],
 
422
            index.get_ancestry_with_ghosts(["c"]))
 
423
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
424
            index.get_ancestry_with_ghosts(["d"]))
 
425
        self.assertEqual(["a", "e", "b"],
 
426
            index.get_ancestry_with_ghosts(["a", "b"]))
 
427
        self.assertEqual(["a", "g", "f", "c"],
 
428
            index.get_ancestry_with_ghosts(["a", "c"]))
 
429
        self.assertEqual(
 
430
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
431
            index.get_ancestry_with_ghosts(["b", "d"]))
 
432
 
 
433
        self.assertRaises(RevisionNotPresent,
 
434
            index.get_ancestry_with_ghosts, ["e"])
 
435
 
 
436
    def test_num_versions(self):
 
437
        transport = MockTransport([
 
438
            _KnitIndex.HEADER
 
439
            ])
 
440
        index = _KnitIndex(transport, "filename", "r")
 
441
 
 
442
        self.assertEqual(0, index.num_versions())
 
443
        self.assertEqual(0, len(index))
 
444
 
 
445
        index.add_version("a", ["option"], 0, 1, [])
 
446
        self.assertEqual(1, index.num_versions())
 
447
        self.assertEqual(1, len(index))
 
448
 
 
449
        index.add_version("a", ["option2"], 1, 2, [])
 
450
        self.assertEqual(1, index.num_versions())
 
451
        self.assertEqual(1, len(index))
 
452
 
 
453
        index.add_version("b", ["option"], 0, 1, [])
 
454
        self.assertEqual(2, index.num_versions())
 
455
        self.assertEqual(2, len(index))
 
456
 
 
457
    def test_get_versions(self):
 
458
        transport = MockTransport([
 
459
            _KnitIndex.HEADER
 
460
            ])
 
461
        index = _KnitIndex(transport, "filename", "r")
 
462
 
 
463
        self.assertEqual([], index.get_versions())
 
464
 
 
465
        index.add_version("a", ["option"], 0, 1, [])
 
466
        self.assertEqual(["a"], index.get_versions())
 
467
 
 
468
        index.add_version("a", ["option"], 0, 1, [])
 
469
        self.assertEqual(["a"], index.get_versions())
 
470
 
 
471
        index.add_version("b", ["option"], 0, 1, [])
 
472
        self.assertEqual(["a", "b"], index.get_versions())
 
473
 
 
474
    def test_idx_to_name(self):
 
475
        transport = MockTransport([
 
476
            _KnitIndex.HEADER,
 
477
            "a option 0 1 :",
 
478
            "b option 0 1 :"
 
479
            ])
 
480
        index = _KnitIndex(transport, "filename", "r")
 
481
 
 
482
        self.assertEqual("a", index.idx_to_name(0))
 
483
        self.assertEqual("b", index.idx_to_name(1))
 
484
        self.assertEqual("b", index.idx_to_name(-1))
 
485
        self.assertEqual("a", index.idx_to_name(-2))
 
486
 
 
487
    def test_lookup(self):
 
488
        transport = MockTransport([
 
489
            _KnitIndex.HEADER,
 
490
            "a option 0 1 :",
 
491
            "b option 0 1 :"
 
492
            ])
 
493
        index = _KnitIndex(transport, "filename", "r")
 
494
 
 
495
        self.assertEqual(0, index.lookup("a"))
 
496
        self.assertEqual(1, index.lookup("b"))
 
497
 
 
498
    def test_add_version(self):
 
499
        transport = MockTransport([
 
500
            _KnitIndex.HEADER
 
501
            ])
 
502
        index = _KnitIndex(transport, "filename", "r")
 
503
 
 
504
        index.add_version("a", ["option"], 0, 1, ["b"])
 
505
        self.assertEqual(("append_bytes",
 
506
            ("filename", "\na option 0 1 .b :"),
 
507
            {}), transport.calls.pop(0))
 
508
        self.assertTrue(index.has_version("a"))
 
509
        self.assertEqual(1, index.num_versions())
 
510
        self.assertEqual((0, 1), index.get_position("a"))
 
511
        self.assertEqual(["option"], index.get_options("a"))
 
512
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
513
 
 
514
        index.add_version("a", ["opt"], 1, 2, ["c"])
 
515
        self.assertEqual(("append_bytes",
 
516
            ("filename", "\na opt 1 2 .c :"),
 
517
            {}), transport.calls.pop(0))
 
518
        self.assertTrue(index.has_version("a"))
 
519
        self.assertEqual(1, index.num_versions())
 
520
        self.assertEqual((1, 2), index.get_position("a"))
 
521
        self.assertEqual(["opt"], index.get_options("a"))
 
522
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
523
 
 
524
        index.add_version("b", ["option"], 2, 3, ["a"])
 
525
        self.assertEqual(("append_bytes",
 
526
            ("filename", "\nb option 2 3 0 :"),
 
527
            {}), transport.calls.pop(0))
 
528
        self.assertTrue(index.has_version("b"))
 
529
        self.assertEqual(2, index.num_versions())
 
530
        self.assertEqual((2, 3), index.get_position("b"))
 
531
        self.assertEqual(["option"], index.get_options("b"))
 
532
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
533
 
 
534
    def test_add_versions(self):
 
535
        transport = MockTransport([
 
536
            _KnitIndex.HEADER
 
537
            ])
 
538
        index = _KnitIndex(transport, "filename", "r")
 
539
 
 
540
        index.add_versions([
 
541
            ("a", ["option"], 0, 1, ["b"]),
 
542
            ("a", ["opt"], 1, 2, ["c"]),
 
543
            ("b", ["option"], 2, 3, ["a"])
 
544
            ])
 
545
        self.assertEqual(("append_bytes", ("filename",
 
546
            "\na option 0 1 .b :"
 
547
            "\na opt 1 2 .c :"
 
548
            "\nb option 2 3 0 :"
 
549
            ), {}), transport.calls.pop(0))
 
550
        self.assertTrue(index.has_version("a"))
 
551
        self.assertTrue(index.has_version("b"))
 
552
        self.assertEqual(2, index.num_versions())
 
553
        self.assertEqual((1, 2), index.get_position("a"))
 
554
        self.assertEqual((2, 3), index.get_position("b"))
 
555
        self.assertEqual(["opt"], index.get_options("a"))
 
556
        self.assertEqual(["option"], index.get_options("b"))
 
557
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
558
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
559
 
 
560
    def test_delay_create_and_add_versions(self):
 
561
        transport = MockTransport()
 
562
 
 
563
        index = _KnitIndex(transport, "filename", "w",
 
564
            create=True, file_mode="wb", create_parent_dir=True,
 
565
            delay_create=True, dir_mode=0777)
 
566
        self.assertEqual([], transport.calls)
 
567
 
 
568
        index.add_versions([
 
569
            ("a", ["option"], 0, 1, ["b"]),
 
570
            ("a", ["opt"], 1, 2, ["c"]),
 
571
            ("b", ["option"], 2, 3, ["a"])
 
572
            ])
 
573
        name, (filename, f), kwargs = transport.calls.pop(0)
 
574
        self.assertEqual("put_file_non_atomic", name)
 
575
        self.assertEqual(
 
576
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
577
            kwargs)
 
578
        self.assertEqual("filename", filename)
 
579
        self.assertEqual(
 
580
            index.HEADER +
 
581
            "\na option 0 1 .b :"
 
582
            "\na opt 1 2 .c :"
 
583
            "\nb option 2 3 0 :",
 
584
            f.read())
 
585
 
 
586
    def test_has_version(self):
 
587
        transport = MockTransport([
 
588
            _KnitIndex.HEADER,
 
589
            "a option 0 1 :"
 
590
            ])
 
591
        index = _KnitIndex(transport, "filename", "r")
 
592
 
 
593
        self.assertTrue(index.has_version("a"))
 
594
        self.assertFalse(index.has_version("b"))
 
595
 
 
596
    def test_get_position(self):
 
597
        transport = MockTransport([
 
598
            _KnitIndex.HEADER,
 
599
            "a option 0 1 :",
 
600
            "b option 1 2 :"
 
601
            ])
 
602
        index = _KnitIndex(transport, "filename", "r")
 
603
 
 
604
        self.assertEqual((0, 1), index.get_position("a"))
 
605
        self.assertEqual((1, 2), index.get_position("b"))
 
606
 
 
607
    def test_get_method(self):
 
608
        transport = MockTransport([
 
609
            _KnitIndex.HEADER,
 
610
            "a fulltext,unknown 0 1 :",
 
611
            "b unknown,line-delta 1 2 :",
 
612
            "c bad 3 4 :"
 
613
            ])
 
614
        index = _KnitIndex(transport, "filename", "r")
 
615
 
 
616
        self.assertEqual("fulltext", index.get_method("a"))
 
617
        self.assertEqual("line-delta", index.get_method("b"))
 
618
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
619
 
 
620
    def test_get_options(self):
 
621
        transport = MockTransport([
 
622
            _KnitIndex.HEADER,
 
623
            "a opt1 0 1 :",
 
624
            "b opt2,opt3 1 2 :"
 
625
            ])
 
626
        index = _KnitIndex(transport, "filename", "r")
 
627
 
 
628
        self.assertEqual(["opt1"], index.get_options("a"))
 
629
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
630
 
 
631
    def test_get_parents(self):
 
632
        transport = MockTransport([
 
633
            _KnitIndex.HEADER,
 
634
            "a option 0 1 :",
 
635
            "b option 1 2 0 .c :",
 
636
            "c option 1 2 1 0 .e :"
 
637
            ])
 
638
        index = _KnitIndex(transport, "filename", "r")
 
639
 
 
640
        self.assertEqual([], index.get_parents("a"))
 
641
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
642
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
643
 
 
644
    def test_get_parents_with_ghosts(self):
 
645
        transport = MockTransport([
 
646
            _KnitIndex.HEADER,
 
647
            "a option 0 1 :",
 
648
            "b option 1 2 0 .c :",
 
649
            "c option 1 2 1 0 .e :"
 
650
            ])
 
651
        index = _KnitIndex(transport, "filename", "r")
 
652
 
 
653
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
654
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
655
        self.assertEqual(["b", "a", "e"],
 
656
            index.get_parents_with_ghosts("c"))
 
657
 
 
658
    def test_check_versions_present(self):
 
659
        transport = MockTransport([
 
660
            _KnitIndex.HEADER,
 
661
            "a option 0 1 :",
 
662
            "b option 0 1 :"
 
663
            ])
 
664
        index = _KnitIndex(transport, "filename", "r")
 
665
 
 
666
        check = index.check_versions_present
 
667
 
 
668
        check([])
 
669
        check(["a"])
 
670
        check(["b"])
 
671
        check(["a", "b"])
 
672
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
673
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
674
 
 
675
 
 
676
class KnitTests(TestCaseWithTransport):
 
677
    """Class containing knit test helper routines."""
 
678
 
 
679
    def make_test_knit(self, annotate=False, delay_create=False):
 
680
        if not annotate:
 
681
            factory = KnitPlainFactory()
 
682
        else:
 
683
            factory = None
 
684
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
685
                                 factory=factory, create=True,
 
686
                                 delay_create=delay_create)
 
687
 
 
688
 
 
689
class BasicKnitTests(KnitTests):
 
690
 
 
691
    def add_stock_one_and_one_a(self, k):
 
692
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
693
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
694
 
 
695
    def test_knit_constructor(self):
 
696
        """Construct empty k"""
 
697
        self.make_test_knit()
 
698
 
 
699
    def test_knit_add(self):
 
700
        """Store one text in knit and retrieve"""
 
701
        k = self.make_test_knit()
 
702
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
703
        self.assertTrue(k.has_version('text-1'))
 
704
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
705
 
 
706
    def test_knit_reload(self):
 
707
        # test that the content in a reloaded knit is correct
 
708
        k = self.make_test_knit()
 
709
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
710
        del k
 
711
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
712
        self.assertTrue(k2.has_version('text-1'))
 
713
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
714
 
 
715
    def test_knit_several(self):
 
716
        """Store several texts in a knit"""
 
717
        k = self.make_test_knit()
 
718
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
719
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
720
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
721
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
722
        
 
723
    def test_repeated_add(self):
 
724
        """Knit traps attempt to replace existing version"""
 
725
        k = self.make_test_knit()
 
726
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
727
        self.assertRaises(RevisionAlreadyPresent, 
 
728
                k.add_lines,
 
729
                'text-1', [], split_lines(TEXT_1))
 
730
 
 
731
    def test_empty(self):
 
732
        k = self.make_test_knit(True)
 
733
        k.add_lines('text-1', [], [])
 
734
        self.assertEquals(k.get_lines('text-1'), [])
 
735
 
 
736
    def test_incomplete(self):
 
737
        """Test if texts without a ending line-end can be inserted and
 
738
        extracted."""
 
739
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
740
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
741
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
742
        # reopening ensures maximum room for confusion
 
743
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
744
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
745
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
746
 
 
747
    def test_delta(self):
 
748
        """Expression of knit delta as lines"""
 
749
        k = self.make_test_knit()
 
750
        KnitContent
 
751
        td = list(line_delta(TEXT_1.splitlines(True),
 
752
                             TEXT_1A.splitlines(True)))
 
753
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
754
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
755
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
756
 
 
757
    def assertDerivedBlocksEqual(self, source, target, noeol=False):
 
758
        """Assert that the derived matching blocks match real output"""
 
759
        source_lines = source.splitlines(True)
 
760
        target_lines = target.splitlines(True)
 
761
        def nl(line):
 
762
            if noeol and not line.endswith('\n'):
 
763
                return line + '\n'
 
764
            else:
 
765
                return line
 
766
        source_content = KnitContent([(None, nl(l)) for l in source_lines])
 
767
        target_content = KnitContent([(None, nl(l)) for l in target_lines])
 
768
        line_delta = source_content.line_delta(target_content)
 
769
        delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
 
770
            source_lines, target_lines))
 
771
        matcher = KnitSequenceMatcher(None, source_lines, target_lines)
 
772
        matcher_blocks = list(list(matcher.get_matching_blocks()))
 
773
        self.assertEqual(matcher_blocks, delta_blocks)
 
774
 
 
775
    def test_get_line_delta_blocks(self):
 
776
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
 
777
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
 
778
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
 
779
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
 
780
        self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
 
781
        self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
 
782
        self.assertDerivedBlocksEqual(TEXT_1A, '')
 
783
        self.assertDerivedBlocksEqual('', TEXT_1A)
 
784
        self.assertDerivedBlocksEqual('', '')
 
785
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
 
786
 
 
787
    def test_get_line_delta_blocks_noeol(self):
 
788
        """Handle historical knit deltas safely
 
789
 
 
790
        Some existing knit deltas don't consider the last line to differ
 
791
        when the only difference whether it has a final newline.
 
792
 
 
793
        New knit deltas appear to always consider the last line to differ
 
794
        in this case.
 
795
        """
 
796
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
 
797
        self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
 
798
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
 
799
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
 
800
 
 
801
    def test_add_with_parents(self):
 
802
        """Store in knit with parents"""
 
803
        k = self.make_test_knit()
 
804
        self.add_stock_one_and_one_a(k)
 
805
        self.assertEquals(k.get_parents('text-1'), [])
 
806
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
807
 
 
808
    def test_ancestry(self):
 
809
        """Store in knit with parents"""
 
810
        k = self.make_test_knit()
 
811
        self.add_stock_one_and_one_a(k)
 
812
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
813
 
 
814
    def test_add_delta(self):
 
815
        """Store in knit with parents"""
 
816
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
817
            delta=True, create=True)
 
818
        self.add_stock_one_and_one_a(k)
 
819
        k.clear_cache()
 
820
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
821
 
 
822
    def test_annotate(self):
 
823
        """Annotations"""
 
824
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
825
            delta=True, create=True)
 
826
        self.insert_and_test_small_annotate(k)
 
827
 
 
828
    def insert_and_test_small_annotate(self, k):
 
829
        """test annotation with k works correctly."""
 
830
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
831
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
832
 
 
833
        origins = k.annotate('text-2')
 
834
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
835
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
836
 
 
837
    def test_annotate_fulltext(self):
 
838
        """Annotations"""
 
839
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
840
            delta=False, create=True)
 
841
        self.insert_and_test_small_annotate(k)
 
842
 
 
843
    def test_annotate_merge_1(self):
 
844
        k = self.make_test_knit(True)
 
845
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
846
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
847
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
848
        origins = k.annotate('text-am')
 
849
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
850
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
851
 
 
852
    def test_annotate_merge_2(self):
 
853
        k = self.make_test_knit(True)
 
854
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
855
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
856
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
857
        origins = k.annotate('text-am')
 
858
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
859
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
860
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
861
 
 
862
    def test_annotate_merge_9(self):
 
863
        k = self.make_test_knit(True)
 
864
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
865
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
866
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
867
        origins = k.annotate('text-am')
 
868
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
869
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
870
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
871
 
 
872
    def test_annotate_merge_3(self):
 
873
        k = self.make_test_knit(True)
 
874
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
875
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
876
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
877
        origins = k.annotate('text-am')
 
878
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
879
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
880
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
881
 
 
882
    def test_annotate_merge_4(self):
 
883
        k = self.make_test_knit(True)
 
884
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
885
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
886
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
887
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
888
        origins = k.annotate('text-am')
 
889
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
890
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
891
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
892
 
 
893
    def test_annotate_merge_5(self):
 
894
        k = self.make_test_knit(True)
 
895
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
896
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
897
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
898
        k.add_lines('text-am',
 
899
                    ['text-a1', 'text-a2', 'text-a3'],
 
900
                    ['a\n', 'e\n', 'z\n'])
 
901
        origins = k.annotate('text-am')
 
902
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
903
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
904
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
905
 
 
906
    def test_annotate_file_cherry_pick(self):
 
907
        k = self.make_test_knit(True)
 
908
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
909
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
910
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
911
        origins = k.annotate('text-3')
 
912
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
913
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
914
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
915
 
 
916
    def test_knit_join(self):
 
917
        """Store in knit with parents"""
 
918
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
919
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
920
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
921
 
 
922
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
923
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
924
 
 
925
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
926
 
 
927
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
928
        count = k2.join(k1, version_ids=['text-m'])
 
929
        self.assertEquals(count, 5)
 
930
        self.assertTrue(k2.has_version('text-a'))
 
931
        self.assertTrue(k2.has_version('text-c'))
 
932
 
 
933
    def test_reannotate(self):
 
934
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
935
                               factory=KnitAnnotateFactory(), create=True)
 
936
        # 0
 
937
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
938
        # 1
 
939
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
940
 
 
941
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
942
                               factory=KnitAnnotateFactory(), create=True)
 
943
        k2.join(k1, version_ids=['text-b'])
 
944
 
 
945
        # 2
 
946
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
947
        # 2
 
948
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
949
        # 3
 
950
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
951
 
 
952
        # test-c will have index 3
 
953
        k1.join(k2, version_ids=['text-c'])
 
954
 
 
955
        lines = k1.get_lines('text-c')
 
956
        self.assertEquals(lines, ['z\n', 'c\n'])
 
957
 
 
958
        origins = k1.annotate('text-c')
 
959
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
960
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
961
 
 
962
    def test_get_line_delta_texts(self):
 
963
        """Make sure we can call get_texts on text with reused line deltas"""
 
964
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
965
                               factory=KnitPlainFactory(), create=True)
 
966
        for t in range(3):
 
967
            if t == 0:
 
968
                parents = []
 
969
            else:
 
970
                parents = ['%d' % (t-1)]
 
971
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
972
        k1.get_texts(('%d' % t) for t in range(3))
 
973
        
 
974
    def test_iter_lines_reads_in_order(self):
 
975
        t = MemoryTransport()
 
976
        instrumented_t = TransportLogger(t)
 
977
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
978
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
979
        # add texts with no required ordering
 
980
        k1.add_lines('base', [], ['text\n'])
 
981
        k1.add_lines('base2', [], ['text2\n'])
 
982
        k1.clear_cache()
 
983
        instrumented_t._calls = []
 
984
        # request a last-first iteration
 
985
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
986
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
987
        self.assertEqual(['text\n', 'text2\n'], results)
 
988
 
 
989
    def test_create_empty_annotated(self):
 
990
        k1 = self.make_test_knit(True)
 
991
        # 0
 
992
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
993
        k2 = k1.create_empty('t', MemoryTransport())
 
994
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
995
        self.assertEqual(k1.delta, k2.delta)
 
996
        # the generic test checks for empty content and file class
 
997
 
 
998
    def test_knit_format(self):
 
999
        # this tests that a new knit index file has the expected content
 
1000
        # and that is writes the data we expect as records are added.
 
1001
        knit = self.make_test_knit(True)
 
1002
        # Now knit files are not created until we first add data to them
 
1003
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
1004
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1005
        self.assertFileEqual(
 
1006
            "# bzr knit index 8\n"
 
1007
            "\n"
 
1008
            "revid fulltext 0 84 .a_ghost :",
 
1009
            'test.kndx')
 
1010
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
1011
        self.assertFileEqual(
 
1012
            "# bzr knit index 8\n"
 
1013
            "\nrevid fulltext 0 84 .a_ghost :"
 
1014
            "\nrevid2 line-delta 84 82 0 :",
 
1015
            'test.kndx')
 
1016
        # we should be able to load this file again
 
1017
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1018
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1019
        # write a short write to the file and ensure that its ignored
 
1020
        indexfile = file('test.kndx', 'at')
 
1021
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
1022
        indexfile.close()
 
1023
        # we should be able to load this file again
 
1024
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
1025
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1026
        # and add a revision with the same id the failed write had
 
1027
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
1028
        # and when reading it revid3 should now appear.
 
1029
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1030
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
1031
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
1032
 
 
1033
    def test_delay_create(self):
 
1034
        """Test that passing delay_create=True creates files late"""
 
1035
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
1036
        self.failIfExists('test.knit')
 
1037
        self.failIfExists('test.kndx')
 
1038
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1039
        self.failUnlessExists('test.knit')
 
1040
        self.assertFileEqual(
 
1041
            "# bzr knit index 8\n"
 
1042
            "\n"
 
1043
            "revid fulltext 0 84 .a_ghost :",
 
1044
            'test.kndx')
 
1045
 
 
1046
    def test_create_parent_dir(self):
 
1047
        """create_parent_dir can create knits in nonexistant dirs"""
 
1048
        # Has no effect if we don't set 'delay_create'
 
1049
        trans = get_transport('.')
 
1050
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
1051
                          trans, access_mode='w', factory=None,
 
1052
                          create=True, create_parent_dir=True)
 
1053
        # Nothing should have changed yet
 
1054
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1055
                                 factory=None, create=True,
 
1056
                                 create_parent_dir=True,
 
1057
                                 delay_create=True)
 
1058
        self.failIfExists('dir/test.knit')
 
1059
        self.failIfExists('dir/test.kndx')
 
1060
        self.failIfExists('dir')
 
1061
        knit.add_lines('revid', [], ['a\n'])
 
1062
        self.failUnlessExists('dir')
 
1063
        self.failUnlessExists('dir/test.knit')
 
1064
        self.assertFileEqual(
 
1065
            "# bzr knit index 8\n"
 
1066
            "\n"
 
1067
            "revid fulltext 0 84  :",
 
1068
            'dir/test.kndx')
 
1069
 
 
1070
    def test_create_mode_700(self):
 
1071
        trans = get_transport('.')
 
1072
        if not trans._can_roundtrip_unix_modebits():
 
1073
            # Can't roundtrip, so no need to run this test
 
1074
            return
 
1075
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1076
                                 factory=None, create=True,
 
1077
                                 create_parent_dir=True,
 
1078
                                 delay_create=True,
 
1079
                                 file_mode=0600,
 
1080
                                 dir_mode=0700)
 
1081
        knit.add_lines('revid', [], ['a\n'])
 
1082
        self.assertTransportMode(trans, 'dir', 0700)
 
1083
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
1084
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
1085
 
 
1086
    def test_create_mode_770(self):
 
1087
        trans = get_transport('.')
 
1088
        if not trans._can_roundtrip_unix_modebits():
 
1089
            # Can't roundtrip, so no need to run this test
 
1090
            return
 
1091
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1092
                                 factory=None, create=True,
 
1093
                                 create_parent_dir=True,
 
1094
                                 delay_create=True,
 
1095
                                 file_mode=0660,
 
1096
                                 dir_mode=0770)
 
1097
        knit.add_lines('revid', [], ['a\n'])
 
1098
        self.assertTransportMode(trans, 'dir', 0770)
 
1099
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
1100
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
1101
 
 
1102
    def test_create_mode_777(self):
 
1103
        trans = get_transport('.')
 
1104
        if not trans._can_roundtrip_unix_modebits():
 
1105
            # Can't roundtrip, so no need to run this test
 
1106
            return
 
1107
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1108
                                 factory=None, create=True,
 
1109
                                 create_parent_dir=True,
 
1110
                                 delay_create=True,
 
1111
                                 file_mode=0666,
 
1112
                                 dir_mode=0777)
 
1113
        knit.add_lines('revid', [], ['a\n'])
 
1114
        self.assertTransportMode(trans, 'dir', 0777)
 
1115
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
1116
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
1117
 
 
1118
    def test_plan_merge(self):
 
1119
        my_knit = self.make_test_knit(annotate=True)
 
1120
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
1121
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
1122
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
1123
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
1124
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
1125
            self.assertEqual(plan_line, expected_line)
 
1126
 
 
1127
 
 
1128
TEXT_1 = """\
 
1129
Banana cup cakes:
 
1130
 
 
1131
- bananas
 
1132
- eggs
 
1133
- broken tea cups
 
1134
"""
 
1135
 
 
1136
TEXT_1A = """\
 
1137
Banana cup cake recipe
 
1138
(serves 6)
 
1139
 
 
1140
- bananas
 
1141
- eggs
 
1142
- broken tea cups
 
1143
- self-raising flour
 
1144
"""
 
1145
 
 
1146
TEXT_1B = """\
 
1147
Banana cup cake recipe
 
1148
 
 
1149
- bananas (do not use plantains!!!)
 
1150
- broken tea cups
 
1151
- flour
 
1152
"""
 
1153
 
 
1154
delta_1_1a = """\
 
1155
0,1,2
 
1156
Banana cup cake recipe
 
1157
(serves 6)
 
1158
5,5,1
 
1159
- self-raising flour
 
1160
"""
 
1161
 
 
1162
TEXT_2 = """\
 
1163
Boeuf bourguignon
 
1164
 
 
1165
- beef
 
1166
- red wine
 
1167
- small onions
 
1168
- carrot
 
1169
- mushrooms
 
1170
"""
 
1171
 
 
1172
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
1173
new-a|(serves 6)
 
1174
unchanged|
 
1175
killed-b|- bananas
 
1176
killed-b|- eggs
 
1177
new-b|- bananas (do not use plantains!!!)
 
1178
unchanged|- broken tea cups
 
1179
new-a|- self-raising flour
 
1180
new-b|- flour
 
1181
"""
 
1182
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
1183
 
 
1184
 
 
1185
def line_delta(from_lines, to_lines):
 
1186
    """Generate line-based delta from one text to another"""
 
1187
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
1188
    for op in s.get_opcodes():
 
1189
        if op[0] == 'equal':
 
1190
            continue
 
1191
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
1192
        for i in range(op[3], op[4]):
 
1193
            yield to_lines[i]
 
1194
 
 
1195
 
 
1196
def apply_line_delta(basis_lines, delta_lines):
 
1197
    """Apply a line-based perfect diff
 
1198
    
 
1199
    basis_lines -- text to apply the patch to
 
1200
    delta_lines -- diff instructions and content
 
1201
    """
 
1202
    out = basis_lines[:]
 
1203
    i = 0
 
1204
    offset = 0
 
1205
    while i < len(delta_lines):
 
1206
        l = delta_lines[i]
 
1207
        a, b, c = map(long, l.split(','))
 
1208
        i = i + 1
 
1209
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
1210
        i = i + c
 
1211
        offset = offset + (b - a) + c
 
1212
    return out
 
1213
 
 
1214
 
 
1215
class TestWeaveToKnit(KnitTests):
 
1216
 
 
1217
    def test_weave_to_knit_matches(self):
 
1218
        # check that the WeaveToKnit is_compatible function
 
1219
        # registers True for a Weave to a Knit.
 
1220
        w = Weave()
 
1221
        k = self.make_test_knit()
 
1222
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
1223
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
1224
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
1225
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
1226
 
 
1227
 
 
1228
class TestKnitCaching(KnitTests):
 
1229
    
 
1230
    def create_knit(self, cache_add=False):
 
1231
        k = self.make_test_knit(True)
 
1232
        if cache_add:
 
1233
            k.enable_cache()
 
1234
 
 
1235
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1236
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1237
        return k
 
1238
 
 
1239
    def test_no_caching(self):
 
1240
        k = self.create_knit()
 
1241
        # Nothing should be cached without setting 'enable_cache'
 
1242
        self.assertEqual({}, k._data._cache)
 
1243
 
 
1244
    def test_cache_add_and_clear(self):
 
1245
        k = self.create_knit(True)
 
1246
 
 
1247
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
1248
 
 
1249
        k.clear_cache()
 
1250
        self.assertEqual({}, k._data._cache)
 
1251
 
 
1252
    def test_cache_data_read_raw(self):
 
1253
        k = self.create_knit()
 
1254
 
 
1255
        # Now cache and read
 
1256
        k.enable_cache()
 
1257
 
 
1258
        def read_one_raw(version):
 
1259
            pos_map = k._get_components_positions([version])
 
1260
            method, pos, size, next = pos_map[version]
 
1261
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
1262
            self.assertEqual(1, len(lst))
 
1263
            return lst[0]
 
1264
 
 
1265
        val = read_one_raw('text-1')
 
1266
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
1267
 
 
1268
        k.clear_cache()
 
1269
        # After clear, new reads are not cached
 
1270
        self.assertEqual({}, k._data._cache)
 
1271
 
 
1272
        val2 = read_one_raw('text-1')
 
1273
        self.assertEqual(val, val2)
 
1274
        self.assertEqual({}, k._data._cache)
 
1275
 
 
1276
    def test_cache_data_read(self):
 
1277
        k = self.create_knit()
 
1278
 
 
1279
        def read_one(version):
 
1280
            pos_map = k._get_components_positions([version])
 
1281
            method, pos, size, next = pos_map[version]
 
1282
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
1283
            self.assertEqual(1, len(lst))
 
1284
            return lst[0]
 
1285
 
 
1286
        # Now cache and read
 
1287
        k.enable_cache()
 
1288
 
 
1289
        val = read_one('text-2')
 
1290
        self.assertEqual(['text-2'], k._data._cache.keys())
 
1291
        self.assertEqual('text-2', val[0])
 
1292
        content, digest = k._data._parse_record('text-2',
 
1293
                                                k._data._cache['text-2'])
 
1294
        self.assertEqual(content, val[1])
 
1295
        self.assertEqual(digest, val[2])
 
1296
 
 
1297
        k.clear_cache()
 
1298
        self.assertEqual({}, k._data._cache)
 
1299
 
 
1300
        val2 = read_one('text-2')
 
1301
        self.assertEqual(val, val2)
 
1302
        self.assertEqual({}, k._data._cache)
 
1303
 
 
1304
    def test_cache_read(self):
 
1305
        k = self.create_knit()
 
1306
        k.enable_cache()
 
1307
 
 
1308
        text = k.get_text('text-1')
 
1309
        self.assertEqual(TEXT_1, text)
 
1310
        self.assertEqual(['text-1'], k._data._cache.keys())
 
1311
 
 
1312
        k.clear_cache()
 
1313
        self.assertEqual({}, k._data._cache)
 
1314
 
 
1315
        text = k.get_text('text-1')
 
1316
        self.assertEqual(TEXT_1, text)
 
1317
        self.assertEqual({}, k._data._cache)
 
1318
 
 
1319
 
 
1320
class TestKnitIndex(KnitTests):
 
1321
 
 
1322
    def test_add_versions_dictionary_compresses(self):
 
1323
        """Adding versions to the index should update the lookup dict"""
 
1324
        knit = self.make_test_knit()
 
1325
        idx = knit._index
 
1326
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1327
        self.check_file_contents('test.kndx',
 
1328
            '# bzr knit index 8\n'
 
1329
            '\n'
 
1330
            'a-1 fulltext 0 0  :'
 
1331
            )
 
1332
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
1333
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
1334
                         ])
 
1335
        self.check_file_contents('test.kndx',
 
1336
            '# bzr knit index 8\n'
 
1337
            '\n'
 
1338
            'a-1 fulltext 0 0  :\n'
 
1339
            'a-2 fulltext 0 0 0 :\n'
 
1340
            'a-3 fulltext 0 0 1 :'
 
1341
            )
 
1342
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1343
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1344
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1345
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1346
                         }, idx._cache)
 
1347
 
 
1348
    def test_add_versions_fails_clean(self):
 
1349
        """If add_versions fails in the middle, it restores a pristine state.
 
1350
 
 
1351
        Any modifications that are made to the index are reset if all versions
 
1352
        cannot be added.
 
1353
        """
 
1354
        # This cheats a little bit by passing in a generator which will
 
1355
        # raise an exception before the processing finishes
 
1356
        # Other possibilities would be to have an version with the wrong number
 
1357
        # of entries, or to make the backing transport unable to write any
 
1358
        # files.
 
1359
 
 
1360
        knit = self.make_test_knit()
 
1361
        idx = knit._index
 
1362
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1363
 
 
1364
        class StopEarly(Exception):
 
1365
            pass
 
1366
 
 
1367
        def generate_failure():
 
1368
            """Add some entries and then raise an exception"""
 
1369
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
1370
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
1371
            raise StopEarly()
 
1372
 
 
1373
        # Assert the pre-condition
 
1374
        self.assertEqual(['a-1'], idx._history)
 
1375
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1376
 
 
1377
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
1378
 
 
1379
        # And it shouldn't be modified
 
1380
        self.assertEqual(['a-1'], idx._history)
 
1381
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1382
 
 
1383
    def test_knit_index_ignores_empty_files(self):
 
1384
        # There was a race condition in older bzr, where a ^C at the right time
 
1385
        # could leave an empty .kndx file, which bzr would later claim was a
 
1386
        # corrupted file since the header was not present. In reality, the file
 
1387
        # just wasn't created, so it should be ignored.
 
1388
        t = get_transport('.')
 
1389
        t.put_bytes('test.kndx', '')
 
1390
 
 
1391
        knit = self.make_test_knit()
 
1392
 
 
1393
    def test_knit_index_checks_header(self):
 
1394
        t = get_transport('.')
 
1395
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
1396
 
 
1397
        self.assertRaises(KnitHeaderError, self.make_test_knit)