1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
25
from bzrlib.errors import (
26
RevisionAlreadyPresent,
31
from bzrlib.knit import (
39
from bzrlib.osutils import split_lines
40
from bzrlib.tests import TestCase, TestCaseWithTransport
41
from bzrlib.transport import TransportLogger, get_transport
42
from bzrlib.transport.memory import MemoryTransport
43
from bzrlib.weave import Weave
46
class KnitContentTests(TestCase):
48
def test_constructor(self):
49
content = KnitContent([])
52
content = KnitContent([])
53
self.assertEqual(content.text(), [])
55
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
56
self.assertEqual(content.text(), ["text1", "text2"])
58
def test_annotate(self):
59
content = KnitContent([])
60
self.assertEqual(content.annotate(), [])
62
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
63
self.assertEqual(content.annotate(),
64
[("origin1", "text1"), ("origin2", "text2")])
66
def test_annotate_iter(self):
67
content = KnitContent([])
68
it = content.annotate_iter()
69
self.assertRaises(StopIteration, it.next)
71
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
72
it = content.annotate_iter()
73
self.assertEqual(it.next(), ("origin1", "text1"))
74
self.assertEqual(it.next(), ("origin2", "text2"))
75
self.assertRaises(StopIteration, it.next)
78
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
80
self.assertIsInstance(copy, KnitContent)
81
self.assertEqual(copy.annotate(),
82
[("origin1", "text1"), ("origin2", "text2")])
84
def test_line_delta(self):
85
content1 = KnitContent([("", "a"), ("", "b")])
86
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
87
self.assertEqual(content1.line_delta(content2),
88
[(1, 2, 2, [("", "a"), ("", "c")])])
90
def test_line_delta_iter(self):
91
content1 = KnitContent([("", "a"), ("", "b")])
92
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
93
it = content1.line_delta_iter(content2)
94
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
95
self.assertRaises(StopIteration, it.next)
98
class MockTransport(object):
100
def __init__(self, file_lines=None):
101
self.file_lines = file_lines
103
# We have no base directory for the MockTransport
106
def get(self, filename):
107
if self.file_lines is None:
108
raise NoSuchFile(filename)
110
return StringIO("\n".join(self.file_lines))
112
def __getattr__(self, name):
113
def queue_call(*args, **kwargs):
114
self.calls.append((name, args, kwargs))
118
class LowLevelKnitIndexTests(TestCase):
120
def test_no_such_file(self):
121
transport = MockTransport()
123
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
124
self.assertRaises(NoSuchFile, _KnitIndex, transport,
125
"filename", "w", create=False)
127
def test_create_file(self):
128
transport = MockTransport()
130
index = _KnitIndex(transport, "filename", "w",
131
file_mode="wb", create=True)
133
("put_bytes_non_atomic",
134
("filename", index.HEADER), {"mode": "wb"}),
135
transport.calls.pop(0))
137
def test_delay_create_file(self):
138
transport = MockTransport()
140
index = _KnitIndex(transport, "filename", "w",
141
create=True, file_mode="wb", create_parent_dir=True,
142
delay_create=True, dir_mode=0777)
143
self.assertEqual([], transport.calls)
145
index.add_versions([])
146
name, (filename, f), kwargs = transport.calls.pop(0)
147
self.assertEqual("put_file_non_atomic", name)
149
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
151
self.assertEqual("filename", filename)
152
self.assertEqual(index.HEADER, f.read())
154
index.add_versions([])
155
self.assertEqual(("append_bytes", ("filename", ""), {}),
156
transport.calls.pop(0))
158
def test_read_utf8_version_id(self):
159
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
160
utf8_revision_id = unicode_revision_id.encode('utf-8')
161
transport = MockTransport([
163
'%s option 0 1 :' % (utf8_revision_id,)
165
index = _KnitIndex(transport, "filename", "r")
166
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
167
# Unicode revision_ids.
168
self.assertTrue(index.has_version(utf8_revision_id))
169
self.assertFalse(index.has_version(unicode_revision_id))
171
def test_read_utf8_parents(self):
172
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
173
utf8_revision_id = unicode_revision_id.encode('utf-8')
174
transport = MockTransport([
176
"version option 0 1 .%s :" % (utf8_revision_id,)
178
index = _KnitIndex(transport, "filename", "r")
179
self.assertEqual([utf8_revision_id],
180
index.get_parents_with_ghosts("version"))
182
def test_read_ignore_corrupted_lines(self):
183
transport = MockTransport([
186
"corrupted options 0 1 .b .c ",
187
"version options 0 1 :"
189
index = _KnitIndex(transport, "filename", "r")
190
self.assertEqual(1, index.num_versions())
191
self.assertTrue(index.has_version("version"))
193
def test_read_corrupted_header(self):
194
transport = MockTransport(['not a bzr knit index header\n'])
195
self.assertRaises(KnitHeaderError,
196
_KnitIndex, transport, "filename", "r")
198
def test_read_duplicate_entries(self):
199
transport = MockTransport([
201
"parent options 0 1 :",
202
"version options1 0 1 0 :",
203
"version options2 1 2 .other :",
204
"version options3 3 4 0 .other :"
206
index = _KnitIndex(transport, "filename", "r")
207
self.assertEqual(2, index.num_versions())
208
self.assertEqual(1, index.lookup("version"))
209
self.assertEqual((3, 4), index.get_position("version"))
210
self.assertEqual(["options3"], index.get_options("version"))
211
self.assertEqual(["parent", "other"],
212
index.get_parents_with_ghosts("version"))
214
def test_read_compressed_parents(self):
215
transport = MockTransport([
219
"c option 0 1 1 0 :",
221
index = _KnitIndex(transport, "filename", "r")
222
self.assertEqual(["a"], index.get_parents("b"))
223
self.assertEqual(["b", "a"], index.get_parents("c"))
225
def test_write_utf8_version_id(self):
226
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
227
utf8_revision_id = unicode_revision_id.encode('utf-8')
228
transport = MockTransport([
231
index = _KnitIndex(transport, "filename", "r")
232
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
233
self.assertEqual(("append_bytes", ("filename",
234
"\n%s option 0 1 :" % (utf8_revision_id,)),
236
transport.calls.pop(0))
238
def test_write_utf8_parents(self):
239
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
240
utf8_revision_id = unicode_revision_id.encode('utf-8')
241
transport = MockTransport([
244
index = _KnitIndex(transport, "filename", "r")
245
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
246
self.assertEqual(("append_bytes", ("filename",
247
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
249
transport.calls.pop(0))
251
def test_get_graph(self):
252
transport = MockTransport()
253
index = _KnitIndex(transport, "filename", "w", create=True)
254
self.assertEqual([], index.get_graph())
256
index.add_version("a", ["option"], 0, 1, ["b"])
257
self.assertEqual([("a", ["b"])], index.get_graph())
259
index.add_version("c", ["option"], 0, 1, ["d"])
260
self.assertEqual([("a", ["b"]), ("c", ["d"])],
261
sorted(index.get_graph()))
263
def test_get_ancestry(self):
264
transport = MockTransport([
267
"b option 0 1 0 .e :",
268
"c option 0 1 1 0 :",
269
"d option 0 1 2 .f :"
271
index = _KnitIndex(transport, "filename", "r")
273
self.assertEqual([], index.get_ancestry([]))
274
self.assertEqual(["a"], index.get_ancestry(["a"]))
275
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
276
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
277
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
278
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
279
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
281
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
283
def test_get_ancestry_with_ghosts(self):
284
transport = MockTransport([
287
"b option 0 1 0 .e :",
288
"c option 0 1 0 .f .g :",
289
"d option 0 1 2 .h .j .k :"
291
index = _KnitIndex(transport, "filename", "r")
293
self.assertEqual([], index.get_ancestry_with_ghosts([]))
294
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
295
self.assertEqual(["a", "e", "b"],
296
index.get_ancestry_with_ghosts(["b"]))
297
self.assertEqual(["a", "g", "f", "c"],
298
index.get_ancestry_with_ghosts(["c"]))
299
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
300
index.get_ancestry_with_ghosts(["d"]))
301
self.assertEqual(["a", "e", "b"],
302
index.get_ancestry_with_ghosts(["a", "b"]))
303
self.assertEqual(["a", "g", "f", "c"],
304
index.get_ancestry_with_ghosts(["a", "c"]))
306
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
307
index.get_ancestry_with_ghosts(["b", "d"]))
309
self.assertRaises(RevisionNotPresent,
310
index.get_ancestry_with_ghosts, ["e"])
312
def test_num_versions(self):
313
transport = MockTransport([
316
index = _KnitIndex(transport, "filename", "r")
318
self.assertEqual(0, index.num_versions())
319
self.assertEqual(0, len(index))
321
index.add_version("a", ["option"], 0, 1, [])
322
self.assertEqual(1, index.num_versions())
323
self.assertEqual(1, len(index))
325
index.add_version("a", ["option2"], 1, 2, [])
326
self.assertEqual(1, index.num_versions())
327
self.assertEqual(1, len(index))
329
index.add_version("b", ["option"], 0, 1, [])
330
self.assertEqual(2, index.num_versions())
331
self.assertEqual(2, len(index))
333
def test_get_versions(self):
334
transport = MockTransport([
337
index = _KnitIndex(transport, "filename", "r")
339
self.assertEqual([], index.get_versions())
341
index.add_version("a", ["option"], 0, 1, [])
342
self.assertEqual(["a"], index.get_versions())
344
index.add_version("a", ["option"], 0, 1, [])
345
self.assertEqual(["a"], index.get_versions())
347
index.add_version("b", ["option"], 0, 1, [])
348
self.assertEqual(["a", "b"], index.get_versions())
350
def test_idx_to_name(self):
351
transport = MockTransport([
356
index = _KnitIndex(transport, "filename", "r")
358
self.assertEqual("a", index.idx_to_name(0))
359
self.assertEqual("b", index.idx_to_name(1))
360
self.assertEqual("b", index.idx_to_name(-1))
361
self.assertEqual("a", index.idx_to_name(-2))
363
def test_lookup(self):
364
transport = MockTransport([
369
index = _KnitIndex(transport, "filename", "r")
371
self.assertEqual(0, index.lookup("a"))
372
self.assertEqual(1, index.lookup("b"))
374
def test_add_version(self):
375
transport = MockTransport([
378
index = _KnitIndex(transport, "filename", "r")
380
index.add_version("a", ["option"], 0, 1, ["b"])
381
self.assertEqual(("append_bytes",
382
("filename", "\na option 0 1 .b :"),
383
{}), transport.calls.pop(0))
384
self.assertTrue(index.has_version("a"))
385
self.assertEqual(1, index.num_versions())
386
self.assertEqual((0, 1), index.get_position("a"))
387
self.assertEqual(["option"], index.get_options("a"))
388
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
390
index.add_version("a", ["opt"], 1, 2, ["c"])
391
self.assertEqual(("append_bytes",
392
("filename", "\na opt 1 2 .c :"),
393
{}), transport.calls.pop(0))
394
self.assertTrue(index.has_version("a"))
395
self.assertEqual(1, index.num_versions())
396
self.assertEqual((1, 2), index.get_position("a"))
397
self.assertEqual(["opt"], index.get_options("a"))
398
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
400
index.add_version("b", ["option"], 2, 3, ["a"])
401
self.assertEqual(("append_bytes",
402
("filename", "\nb option 2 3 0 :"),
403
{}), transport.calls.pop(0))
404
self.assertTrue(index.has_version("b"))
405
self.assertEqual(2, index.num_versions())
406
self.assertEqual((2, 3), index.get_position("b"))
407
self.assertEqual(["option"], index.get_options("b"))
408
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
410
def test_add_versions(self):
411
transport = MockTransport([
414
index = _KnitIndex(transport, "filename", "r")
417
("a", ["option"], 0, 1, ["b"]),
418
("a", ["opt"], 1, 2, ["c"]),
419
("b", ["option"], 2, 3, ["a"])
421
self.assertEqual(("append_bytes", ("filename",
422
"\na option 0 1 .b :"
425
), {}), transport.calls.pop(0))
426
self.assertTrue(index.has_version("a"))
427
self.assertTrue(index.has_version("b"))
428
self.assertEqual(2, index.num_versions())
429
self.assertEqual((1, 2), index.get_position("a"))
430
self.assertEqual((2, 3), index.get_position("b"))
431
self.assertEqual(["opt"], index.get_options("a"))
432
self.assertEqual(["option"], index.get_options("b"))
433
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
434
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
436
def test_delay_create_and_add_versions(self):
437
transport = MockTransport()
439
index = _KnitIndex(transport, "filename", "w",
440
create=True, file_mode="wb", create_parent_dir=True,
441
delay_create=True, dir_mode=0777)
442
self.assertEqual([], transport.calls)
445
("a", ["option"], 0, 1, ["b"]),
446
("a", ["opt"], 1, 2, ["c"]),
447
("b", ["option"], 2, 3, ["a"])
449
name, (filename, f), kwargs = transport.calls.pop(0)
450
self.assertEqual("put_file_non_atomic", name)
452
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
454
self.assertEqual("filename", filename)
457
"\na option 0 1 .b :"
459
"\nb option 2 3 0 :",
462
def test_has_version(self):
463
transport = MockTransport([
467
index = _KnitIndex(transport, "filename", "r")
469
self.assertTrue(index.has_version("a"))
470
self.assertFalse(index.has_version("b"))
472
def test_get_position(self):
473
transport = MockTransport([
478
index = _KnitIndex(transport, "filename", "r")
480
self.assertEqual((0, 1), index.get_position("a"))
481
self.assertEqual((1, 2), index.get_position("b"))
483
def test_get_method(self):
484
transport = MockTransport([
486
"a fulltext,unknown 0 1 :",
487
"b unknown,line-delta 1 2 :",
490
index = _KnitIndex(transport, "filename", "r")
492
self.assertEqual("fulltext", index.get_method("a"))
493
self.assertEqual("line-delta", index.get_method("b"))
494
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
496
def test_get_options(self):
497
transport = MockTransport([
502
index = _KnitIndex(transport, "filename", "r")
504
self.assertEqual(["opt1"], index.get_options("a"))
505
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
507
def test_get_parents(self):
508
transport = MockTransport([
511
"b option 1 2 0 .c :",
512
"c option 1 2 1 0 .e :"
514
index = _KnitIndex(transport, "filename", "r")
516
self.assertEqual([], index.get_parents("a"))
517
self.assertEqual(["a", "c"], index.get_parents("b"))
518
self.assertEqual(["b", "a"], index.get_parents("c"))
520
def test_get_parents_with_ghosts(self):
521
transport = MockTransport([
524
"b option 1 2 0 .c :",
525
"c option 1 2 1 0 .e :"
527
index = _KnitIndex(transport, "filename", "r")
529
self.assertEqual([], index.get_parents_with_ghosts("a"))
530
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
531
self.assertEqual(["b", "a", "e"],
532
index.get_parents_with_ghosts("c"))
534
def test_check_versions_present(self):
535
transport = MockTransport([
540
index = _KnitIndex(transport, "filename", "r")
542
check = index.check_versions_present
548
self.assertRaises(RevisionNotPresent, check, ["c"])
549
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
552
class KnitTests(TestCaseWithTransport):
553
"""Class containing knit test helper routines."""
555
def make_test_knit(self, annotate=False, delay_create=False):
557
factory = KnitPlainFactory()
560
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
561
factory=factory, create=True,
562
delay_create=delay_create)
565
class BasicKnitTests(KnitTests):
567
def add_stock_one_and_one_a(self, k):
568
k.add_lines('text-1', [], split_lines(TEXT_1))
569
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
571
def test_knit_constructor(self):
572
"""Construct empty k"""
573
self.make_test_knit()
575
def test_knit_add(self):
576
"""Store one text in knit and retrieve"""
577
k = self.make_test_knit()
578
k.add_lines('text-1', [], split_lines(TEXT_1))
579
self.assertTrue(k.has_version('text-1'))
580
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
582
def test_knit_reload(self):
583
# test that the content in a reloaded knit is correct
584
k = self.make_test_knit()
585
k.add_lines('text-1', [], split_lines(TEXT_1))
587
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
588
self.assertTrue(k2.has_version('text-1'))
589
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
591
def test_knit_several(self):
592
"""Store several texts in a knit"""
593
k = self.make_test_knit()
594
k.add_lines('text-1', [], split_lines(TEXT_1))
595
k.add_lines('text-2', [], split_lines(TEXT_2))
596
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
597
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
599
def test_repeated_add(self):
600
"""Knit traps attempt to replace existing version"""
601
k = self.make_test_knit()
602
k.add_lines('text-1', [], split_lines(TEXT_1))
603
self.assertRaises(RevisionAlreadyPresent,
605
'text-1', [], split_lines(TEXT_1))
607
def test_empty(self):
608
k = self.make_test_knit(True)
609
k.add_lines('text-1', [], [])
610
self.assertEquals(k.get_lines('text-1'), [])
612
def test_incomplete(self):
613
"""Test if texts without a ending line-end can be inserted and
615
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
616
k.add_lines('text-1', [], ['a\n', 'b' ])
617
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
618
# reopening ensures maximum room for confusion
619
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
620
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
621
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
623
def test_delta(self):
624
"""Expression of knit delta as lines"""
625
k = self.make_test_knit()
626
td = list(line_delta(TEXT_1.splitlines(True),
627
TEXT_1A.splitlines(True)))
628
self.assertEqualDiff(''.join(td), delta_1_1a)
629
out = apply_line_delta(TEXT_1.splitlines(True), td)
630
self.assertEqualDiff(''.join(out), TEXT_1A)
632
def test_add_with_parents(self):
633
"""Store in knit with parents"""
634
k = self.make_test_knit()
635
self.add_stock_one_and_one_a(k)
636
self.assertEquals(k.get_parents('text-1'), [])
637
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
639
def test_ancestry(self):
640
"""Store in knit with parents"""
641
k = self.make_test_knit()
642
self.add_stock_one_and_one_a(k)
643
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
645
def test_add_delta(self):
646
"""Store in knit with parents"""
647
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
648
delta=True, create=True)
649
self.add_stock_one_and_one_a(k)
651
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
653
def test_annotate(self):
655
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
656
delta=True, create=True)
657
self.insert_and_test_small_annotate(k)
659
def insert_and_test_small_annotate(self, k):
660
"""test annotation with k works correctly."""
661
k.add_lines('text-1', [], ['a\n', 'b\n'])
662
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
664
origins = k.annotate('text-2')
665
self.assertEquals(origins[0], ('text-1', 'a\n'))
666
self.assertEquals(origins[1], ('text-2', 'c\n'))
668
def test_annotate_fulltext(self):
670
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
671
delta=False, create=True)
672
self.insert_and_test_small_annotate(k)
674
def test_annotate_merge_1(self):
675
k = self.make_test_knit(True)
676
k.add_lines('text-a1', [], ['a\n', 'b\n'])
677
k.add_lines('text-a2', [], ['d\n', 'c\n'])
678
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
679
origins = k.annotate('text-am')
680
self.assertEquals(origins[0], ('text-a2', 'd\n'))
681
self.assertEquals(origins[1], ('text-a1', 'b\n'))
683
def test_annotate_merge_2(self):
684
k = self.make_test_knit(True)
685
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
686
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
687
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
688
origins = k.annotate('text-am')
689
self.assertEquals(origins[0], ('text-a1', 'a\n'))
690
self.assertEquals(origins[1], ('text-a2', 'y\n'))
691
self.assertEquals(origins[2], ('text-a1', 'c\n'))
693
def test_annotate_merge_9(self):
694
k = self.make_test_knit(True)
695
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
696
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
697
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
698
origins = k.annotate('text-am')
699
self.assertEquals(origins[0], ('text-am', 'k\n'))
700
self.assertEquals(origins[1], ('text-a2', 'y\n'))
701
self.assertEquals(origins[2], ('text-a1', 'c\n'))
703
def test_annotate_merge_3(self):
704
k = self.make_test_knit(True)
705
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
706
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
707
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
708
origins = k.annotate('text-am')
709
self.assertEquals(origins[0], ('text-am', 'k\n'))
710
self.assertEquals(origins[1], ('text-a2', 'y\n'))
711
self.assertEquals(origins[2], ('text-a2', 'z\n'))
713
def test_annotate_merge_4(self):
714
k = self.make_test_knit(True)
715
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
716
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
717
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
718
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
719
origins = k.annotate('text-am')
720
self.assertEquals(origins[0], ('text-a1', 'a\n'))
721
self.assertEquals(origins[1], ('text-a1', 'b\n'))
722
self.assertEquals(origins[2], ('text-a2', 'z\n'))
724
def test_annotate_merge_5(self):
725
k = self.make_test_knit(True)
726
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
727
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
728
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
729
k.add_lines('text-am',
730
['text-a1', 'text-a2', 'text-a3'],
731
['a\n', 'e\n', 'z\n'])
732
origins = k.annotate('text-am')
733
self.assertEquals(origins[0], ('text-a1', 'a\n'))
734
self.assertEquals(origins[1], ('text-a2', 'e\n'))
735
self.assertEquals(origins[2], ('text-a3', 'z\n'))
737
def test_annotate_file_cherry_pick(self):
738
k = self.make_test_knit(True)
739
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
740
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
741
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
742
origins = k.annotate('text-3')
743
self.assertEquals(origins[0], ('text-1', 'a\n'))
744
self.assertEquals(origins[1], ('text-1', 'b\n'))
745
self.assertEquals(origins[2], ('text-1', 'c\n'))
747
def test_knit_join(self):
748
"""Store in knit with parents"""
749
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
750
k1.add_lines('text-a', [], split_lines(TEXT_1))
751
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
753
k1.add_lines('text-c', [], split_lines(TEXT_1))
754
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
756
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
758
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
759
count = k2.join(k1, version_ids=['text-m'])
760
self.assertEquals(count, 5)
761
self.assertTrue(k2.has_version('text-a'))
762
self.assertTrue(k2.has_version('text-c'))
764
def test_reannotate(self):
765
k1 = KnitVersionedFile('knit1', get_transport('.'),
766
factory=KnitAnnotateFactory(), create=True)
768
k1.add_lines('text-a', [], ['a\n', 'b\n'])
770
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
772
k2 = KnitVersionedFile('test2', get_transport('.'),
773
factory=KnitAnnotateFactory(), create=True)
774
k2.join(k1, version_ids=['text-b'])
777
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
779
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
781
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
783
# test-c will have index 3
784
k1.join(k2, version_ids=['text-c'])
786
lines = k1.get_lines('text-c')
787
self.assertEquals(lines, ['z\n', 'c\n'])
789
origins = k1.annotate('text-c')
790
self.assertEquals(origins[0], ('text-c', 'z\n'))
791
self.assertEquals(origins[1], ('text-b', 'c\n'))
793
def test_get_line_delta_texts(self):
794
"""Make sure we can call get_texts on text with reused line deltas"""
795
k1 = KnitVersionedFile('test1', get_transport('.'),
796
factory=KnitPlainFactory(), create=True)
801
parents = ['%d' % (t-1)]
802
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
803
k1.get_texts(('%d' % t) for t in range(3))
805
def test_iter_lines_reads_in_order(self):
806
t = MemoryTransport()
807
instrumented_t = TransportLogger(t)
808
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
809
self.assertEqual([('id.kndx',)], instrumented_t._calls)
810
# add texts with no required ordering
811
k1.add_lines('base', [], ['text\n'])
812
k1.add_lines('base2', [], ['text2\n'])
814
instrumented_t._calls = []
815
# request a last-first iteration
816
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
817
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
818
self.assertEqual(['text\n', 'text2\n'], results)
820
def test_create_empty_annotated(self):
821
k1 = self.make_test_knit(True)
823
k1.add_lines('text-a', [], ['a\n', 'b\n'])
824
k2 = k1.create_empty('t', MemoryTransport())
825
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
826
self.assertEqual(k1.delta, k2.delta)
827
# the generic test checks for empty content and file class
829
def test_knit_format(self):
830
# this tests that a new knit index file has the expected content
831
# and that is writes the data we expect as records are added.
832
knit = self.make_test_knit(True)
833
# Now knit files are not created until we first add data to them
834
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
835
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
836
self.assertFileEqual(
837
"# bzr knit index 8\n"
839
"revid fulltext 0 84 .a_ghost :",
841
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
842
self.assertFileEqual(
843
"# bzr knit index 8\n"
844
"\nrevid fulltext 0 84 .a_ghost :"
845
"\nrevid2 line-delta 84 82 0 :",
847
# we should be able to load this file again
848
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
849
self.assertEqual(['revid', 'revid2'], knit.versions())
850
# write a short write to the file and ensure that its ignored
851
indexfile = file('test.kndx', 'at')
852
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
854
# we should be able to load this file again
855
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
856
self.assertEqual(['revid', 'revid2'], knit.versions())
857
# and add a revision with the same id the failed write had
858
knit.add_lines('revid3', ['revid2'], ['a\n'])
859
# and when reading it revid3 should now appear.
860
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
861
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
862
self.assertEqual(['revid2'], knit.get_parents('revid3'))
864
def test_delay_create(self):
865
"""Test that passing delay_create=True creates files late"""
866
knit = self.make_test_knit(annotate=True, delay_create=True)
867
self.failIfExists('test.knit')
868
self.failIfExists('test.kndx')
869
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
870
self.failUnlessExists('test.knit')
871
self.assertFileEqual(
872
"# bzr knit index 8\n"
874
"revid fulltext 0 84 .a_ghost :",
877
def test_create_parent_dir(self):
878
"""create_parent_dir can create knits in nonexistant dirs"""
879
# Has no effect if we don't set 'delay_create'
880
trans = get_transport('.')
881
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
882
trans, access_mode='w', factory=None,
883
create=True, create_parent_dir=True)
884
# Nothing should have changed yet
885
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
886
factory=None, create=True,
887
create_parent_dir=True,
889
self.failIfExists('dir/test.knit')
890
self.failIfExists('dir/test.kndx')
891
self.failIfExists('dir')
892
knit.add_lines('revid', [], ['a\n'])
893
self.failUnlessExists('dir')
894
self.failUnlessExists('dir/test.knit')
895
self.assertFileEqual(
896
"# bzr knit index 8\n"
898
"revid fulltext 0 84 :",
901
def test_create_mode_700(self):
902
trans = get_transport('.')
903
if not trans._can_roundtrip_unix_modebits():
904
# Can't roundtrip, so no need to run this test
906
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
907
factory=None, create=True,
908
create_parent_dir=True,
912
knit.add_lines('revid', [], ['a\n'])
913
self.assertTransportMode(trans, 'dir', 0700)
914
self.assertTransportMode(trans, 'dir/test.knit', 0600)
915
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
917
def test_create_mode_770(self):
918
trans = get_transport('.')
919
if not trans._can_roundtrip_unix_modebits():
920
# Can't roundtrip, so no need to run this test
922
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
923
factory=None, create=True,
924
create_parent_dir=True,
928
knit.add_lines('revid', [], ['a\n'])
929
self.assertTransportMode(trans, 'dir', 0770)
930
self.assertTransportMode(trans, 'dir/test.knit', 0660)
931
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
933
def test_create_mode_777(self):
934
trans = get_transport('.')
935
if not trans._can_roundtrip_unix_modebits():
936
# Can't roundtrip, so no need to run this test
938
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
939
factory=None, create=True,
940
create_parent_dir=True,
944
knit.add_lines('revid', [], ['a\n'])
945
self.assertTransportMode(trans, 'dir', 0777)
946
self.assertTransportMode(trans, 'dir/test.knit', 0666)
947
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
949
def test_plan_merge(self):
950
my_knit = self.make_test_knit(annotate=True)
951
my_knit.add_lines('text1', [], split_lines(TEXT_1))
952
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
953
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
954
plan = list(my_knit.plan_merge('text1a', 'text1b'))
955
for plan_line, expected_line in zip(plan, AB_MERGE):
956
self.assertEqual(plan_line, expected_line)
968
Banana cup cake recipe
978
Banana cup cake recipe
980
- bananas (do not use plantains!!!)
987
Banana cup cake recipe
1003
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1008
new-b|- bananas (do not use plantains!!!)
1009
unchanged|- broken tea cups
1010
new-a|- self-raising flour
1013
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1016
def line_delta(from_lines, to_lines):
1017
"""Generate line-based delta from one text to another"""
1018
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1019
for op in s.get_opcodes():
1020
if op[0] == 'equal':
1022
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1023
for i in range(op[3], op[4]):
1027
def apply_line_delta(basis_lines, delta_lines):
1028
"""Apply a line-based perfect diff
1030
basis_lines -- text to apply the patch to
1031
delta_lines -- diff instructions and content
1033
out = basis_lines[:]
1036
while i < len(delta_lines):
1038
a, b, c = map(long, l.split(','))
1040
out[offset+a:offset+b] = delta_lines[i:i+c]
1042
offset = offset + (b - a) + c
1046
class TestWeaveToKnit(KnitTests):
1048
def test_weave_to_knit_matches(self):
1049
# check that the WeaveToKnit is_compatible function
1050
# registers True for a Weave to a Knit.
1052
k = self.make_test_knit()
1053
self.failUnless(WeaveToKnit.is_compatible(w, k))
1054
self.failIf(WeaveToKnit.is_compatible(k, w))
1055
self.failIf(WeaveToKnit.is_compatible(w, w))
1056
self.failIf(WeaveToKnit.is_compatible(k, k))
1059
class TestKnitCaching(KnitTests):
1061
def create_knit(self, cache_add=False):
1062
k = self.make_test_knit(True)
1066
k.add_lines('text-1', [], split_lines(TEXT_1))
1067
k.add_lines('text-2', [], split_lines(TEXT_2))
1070
def test_no_caching(self):
1071
k = self.create_knit()
1072
# Nothing should be cached without setting 'enable_cache'
1073
self.assertEqual({}, k._data._cache)
1075
def test_cache_add_and_clear(self):
1076
k = self.create_knit(True)
1078
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1081
self.assertEqual({}, k._data._cache)
1083
def test_cache_data_read_raw(self):
1084
k = self.create_knit()
1086
# Now cache and read
1089
def read_one_raw(version):
1090
pos_map = k._get_components_positions([version])
1091
method, pos, size, next = pos_map[version]
1092
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1093
self.assertEqual(1, len(lst))
1096
val = read_one_raw('text-1')
1097
self.assertEqual({'text-1':val[1]}, k._data._cache)
1100
# After clear, new reads are not cached
1101
self.assertEqual({}, k._data._cache)
1103
val2 = read_one_raw('text-1')
1104
self.assertEqual(val, val2)
1105
self.assertEqual({}, k._data._cache)
1107
def test_cache_data_read(self):
1108
k = self.create_knit()
1110
def read_one(version):
1111
pos_map = k._get_components_positions([version])
1112
method, pos, size, next = pos_map[version]
1113
lst = list(k._data.read_records_iter([(version, pos, size)]))
1114
self.assertEqual(1, len(lst))
1117
# Now cache and read
1120
val = read_one('text-2')
1121
self.assertEqual(['text-2'], k._data._cache.keys())
1122
self.assertEqual('text-2', val[0])
1123
content, digest = k._data._parse_record('text-2',
1124
k._data._cache['text-2'])
1125
self.assertEqual(content, val[1])
1126
self.assertEqual(digest, val[2])
1129
self.assertEqual({}, k._data._cache)
1131
val2 = read_one('text-2')
1132
self.assertEqual(val, val2)
1133
self.assertEqual({}, k._data._cache)
1135
def test_cache_read(self):
1136
k = self.create_knit()
1139
text = k.get_text('text-1')
1140
self.assertEqual(TEXT_1, text)
1141
self.assertEqual(['text-1'], k._data._cache.keys())
1144
self.assertEqual({}, k._data._cache)
1146
text = k.get_text('text-1')
1147
self.assertEqual(TEXT_1, text)
1148
self.assertEqual({}, k._data._cache)
1151
class TestKnitIndex(KnitTests):
1153
def test_add_versions_dictionary_compresses(self):
1154
"""Adding versions to the index should update the lookup dict"""
1155
knit = self.make_test_knit()
1157
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1158
self.check_file_contents('test.kndx',
1159
'# bzr knit index 8\n'
1161
'a-1 fulltext 0 0 :'
1163
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1164
('a-3', ['fulltext'], 0, 0, ['a-2']),
1166
self.check_file_contents('test.kndx',
1167
'# bzr knit index 8\n'
1169
'a-1 fulltext 0 0 :\n'
1170
'a-2 fulltext 0 0 0 :\n'
1171
'a-3 fulltext 0 0 1 :'
1173
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1174
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1175
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1176
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1179
def test_add_versions_fails_clean(self):
1180
"""If add_versions fails in the middle, it restores a pristine state.
1182
Any modifications that are made to the index are reset if all versions
1185
# This cheats a little bit by passing in a generator which will
1186
# raise an exception before the processing finishes
1187
# Other possibilities would be to have an version with the wrong number
1188
# of entries, or to make the backing transport unable to write any
1191
knit = self.make_test_knit()
1193
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1195
class StopEarly(Exception):
1198
def generate_failure():
1199
"""Add some entries and then raise an exception"""
1200
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1201
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1204
# Assert the pre-condition
1205
self.assertEqual(['a-1'], idx._history)
1206
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1208
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1210
# And it shouldn't be modified
1211
self.assertEqual(['a-1'], idx._history)
1212
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1214
def test_knit_index_ignores_empty_files(self):
1215
# There was a race condition in older bzr, where a ^C at the right time
1216
# could leave an empty .kndx file, which bzr would later claim was a
1217
# corrupted file since the header was not present. In reality, the file
1218
# just wasn't created, so it should be ignored.
1219
t = get_transport('.')
1220
t.put_bytes('test.kndx', '')
1222
knit = self.make_test_knit()
1224
def test_knit_index_checks_header(self):
1225
t = get_transport('.')
1226
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1228
self.assertRaises(KnitHeaderError, self.make_test_knit)