1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
25
from bzrlib.errors import (
26
RevisionAlreadyPresent,
31
from bzrlib.knit import (
39
from bzrlib.osutils import split_lines
40
from bzrlib.tests import TestCase, TestCaseWithTransport
41
from bzrlib.transport import TransportLogger, get_transport
42
from bzrlib.transport.memory import MemoryTransport
43
from bzrlib.weave import Weave
46
class KnitContentTests(TestCase):
48
def test_constructor(self):
49
content = KnitContent([])
52
content = KnitContent([])
53
self.assertEqual(content.text(), [])
55
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
56
self.assertEqual(content.text(), ["text1", "text2"])
58
def test_annotate(self):
59
content = KnitContent([])
60
self.assertEqual(content.annotate(), [])
62
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
63
self.assertEqual(content.annotate(),
64
[("origin1", "text1"), ("origin2", "text2")])
66
def test_annotate_iter(self):
67
content = KnitContent([])
68
it = content.annotate_iter()
69
self.assertRaises(StopIteration, it.next)
71
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
72
it = content.annotate_iter()
73
self.assertEqual(it.next(), ("origin1", "text1"))
74
self.assertEqual(it.next(), ("origin2", "text2"))
75
self.assertRaises(StopIteration, it.next)
78
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
80
self.assertIsInstance(copy, KnitContent)
81
self.assertEqual(copy.annotate(),
82
[("origin1", "text1"), ("origin2", "text2")])
84
def test_line_delta(self):
85
content1 = KnitContent([("", "a"), ("", "b")])
86
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
87
self.assertEqual(content1.line_delta(content2),
88
[(1, 2, 2, [("", "a"), ("", "c")])])
90
def test_line_delta_iter(self):
91
content1 = KnitContent([("", "a"), ("", "b")])
92
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
93
it = content1.line_delta_iter(content2)
94
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
95
self.assertRaises(StopIteration, it.next)
98
class MockTransport(object):
100
def __init__(self, file_lines=None):
101
self.file_lines = file_lines
103
# We have no base directory for the MockTransport
106
def get(self, filename):
107
if self.file_lines is None:
108
raise NoSuchFile(filename)
110
return StringIO("\n".join(self.file_lines))
112
def __getattr__(self, name):
113
def queue_call(*args, **kwargs):
114
self.calls.append((name, args, kwargs))
118
class LowLevelKnitIndexTests(TestCase):
120
def test_no_such_file(self):
121
transport = MockTransport()
123
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
124
self.assertRaises(NoSuchFile, _KnitIndex, transport,
125
"filename", "w", create=False)
127
def test_create_file(self):
128
transport = MockTransport()
130
index = _KnitIndex(transport, "filename", "w",
131
file_mode="wb", create=True)
133
("put_bytes_non_atomic",
134
("filename", index.HEADER), {"mode": "wb"}),
135
transport.calls.pop(0))
137
def test_delay_create_file(self):
138
transport = MockTransport()
140
index = _KnitIndex(transport, "filename", "w",
141
create=True, file_mode="wb", create_parent_dir=True,
142
delay_create=True, dir_mode=0777)
143
self.assertEqual([], transport.calls)
145
index.add_versions([])
146
name, (filename, f), kwargs = transport.calls.pop(0)
147
self.assertEqual("put_file_non_atomic", name)
149
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
151
self.assertEqual("filename", filename)
152
self.assertEqual(index.HEADER, f.read())
154
index.add_versions([])
155
self.assertEqual(("append_bytes", ("filename", ""), {}),
156
transport.calls.pop(0))
158
def test_read_utf8_version_id(self):
159
transport = MockTransport([
161
u"version-\N{CYRILLIC CAPITAL LETTER A}"
162
u" option 0 1 :".encode("utf-8")
164
index = _KnitIndex(transport, "filename", "r")
166
index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
168
def test_read_utf8_parents(self):
169
transport = MockTransport([
171
u"version option 0 1"
172
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
174
index = _KnitIndex(transport, "filename", "r")
175
self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
176
index.get_parents_with_ghosts("version"))
178
def test_read_ignore_corrupted_lines(self):
179
transport = MockTransport([
182
"corrupted options 0 1 .b .c ",
183
"version options 0 1 :"
185
index = _KnitIndex(transport, "filename", "r")
186
self.assertEqual(1, index.num_versions())
187
self.assertTrue(index.has_version(u"version"))
189
def test_read_corrupted_header(self):
190
transport = MockTransport(['not a bzr knit index header\n'])
191
self.assertRaises(KnitHeaderError,
192
_KnitIndex, transport, "filename", "r")
194
def test_read_duplicate_entries(self):
195
transport = MockTransport([
197
"parent options 0 1 :",
198
"version options1 0 1 0 :",
199
"version options2 1 2 .other :",
200
"version options3 3 4 0 .other :"
202
index = _KnitIndex(transport, "filename", "r")
203
self.assertEqual(2, index.num_versions())
204
self.assertEqual(1, index.lookup(u"version"))
205
self.assertEqual((3, 4), index.get_position(u"version"))
206
self.assertEqual(["options3"], index.get_options(u"version"))
207
self.assertEqual([u"parent", u"other"],
208
index.get_parents_with_ghosts(u"version"))
210
def test_read_compressed_parents(self):
211
transport = MockTransport([
215
"c option 0 1 1 0 :",
217
index = _KnitIndex(transport, "filename", "r")
218
self.assertEqual([u"a"], index.get_parents(u"b"))
219
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
221
def test_write_utf8_version_id(self):
222
transport = MockTransport([
225
index = _KnitIndex(transport, "filename", "r")
226
index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
227
["option"], 0, 1, [])
228
self.assertEqual(("append_bytes", ("filename",
229
u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
230
u" option 0 1 :".encode("utf-8")),
232
transport.calls.pop(0))
234
def test_write_utf8_parents(self):
235
transport = MockTransport([
238
index = _KnitIndex(transport, "filename", "r")
239
index.add_version(u"version", ["option"], 0, 1,
240
[u"version-\N{CYRILLIC CAPITAL LETTER A}"])
241
self.assertEqual(("append_bytes", ("filename",
242
u"\nversion option 0 1"
243
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
245
transport.calls.pop(0))
247
def test_get_graph(self):
248
transport = MockTransport()
249
index = _KnitIndex(transport, "filename", "w", create=True)
250
self.assertEqual([], index.get_graph())
252
index.add_version(u"a", ["option"], 0, 1, [u"b"])
253
self.assertEqual([(u"a", [u"b"])], index.get_graph())
255
index.add_version(u"c", ["option"], 0, 1, [u"d"])
256
self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
257
sorted(index.get_graph()))
259
def test_get_ancestry(self):
260
transport = MockTransport([
263
"b option 0 1 0 .e :",
264
"c option 0 1 1 0 :",
265
"d option 0 1 2 .f :"
267
index = _KnitIndex(transport, "filename", "r")
269
self.assertEqual([], index.get_ancestry([]))
270
self.assertEqual([u"a"], index.get_ancestry([u"a"]))
271
self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
272
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
273
self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
274
self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
275
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
277
self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
279
def test_get_ancestry_with_ghosts(self):
280
transport = MockTransport([
283
"b option 0 1 0 .e :",
284
"c option 0 1 0 .f .g :",
285
"d option 0 1 2 .h .j .k :"
287
index = _KnitIndex(transport, "filename", "r")
289
self.assertEqual([], index.get_ancestry_with_ghosts([]))
290
self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
291
self.assertEqual([u"a", u"e", u"b"],
292
index.get_ancestry_with_ghosts([u"b"]))
293
self.assertEqual([u"a", u"g", u"f", u"c"],
294
index.get_ancestry_with_ghosts([u"c"]))
295
self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
296
index.get_ancestry_with_ghosts([u"d"]))
297
self.assertEqual([u"a", u"e", u"b"],
298
index.get_ancestry_with_ghosts([u"a", u"b"]))
299
self.assertEqual([u"a", u"g", u"f", u"c"],
300
index.get_ancestry_with_ghosts([u"a", u"c"]))
302
[u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
303
index.get_ancestry_with_ghosts([u"b", u"d"]))
305
self.assertRaises(RevisionNotPresent,
306
index.get_ancestry_with_ghosts, [u"e"])
308
def test_num_versions(self):
309
transport = MockTransport([
312
index = _KnitIndex(transport, "filename", "r")
314
self.assertEqual(0, index.num_versions())
315
self.assertEqual(0, len(index))
317
index.add_version(u"a", ["option"], 0, 1, [])
318
self.assertEqual(1, index.num_versions())
319
self.assertEqual(1, len(index))
321
index.add_version(u"a", ["option2"], 1, 2, [])
322
self.assertEqual(1, index.num_versions())
323
self.assertEqual(1, len(index))
325
index.add_version(u"b", ["option"], 0, 1, [])
326
self.assertEqual(2, index.num_versions())
327
self.assertEqual(2, len(index))
329
def test_get_versions(self):
330
transport = MockTransport([
333
index = _KnitIndex(transport, "filename", "r")
335
self.assertEqual([], index.get_versions())
337
index.add_version(u"a", ["option"], 0, 1, [])
338
self.assertEqual([u"a"], index.get_versions())
340
index.add_version(u"a", ["option"], 0, 1, [])
341
self.assertEqual([u"a"], index.get_versions())
343
index.add_version(u"b", ["option"], 0, 1, [])
344
self.assertEqual([u"a", u"b"], index.get_versions())
346
def test_idx_to_name(self):
347
transport = MockTransport([
352
index = _KnitIndex(transport, "filename", "r")
354
self.assertEqual(u"a", index.idx_to_name(0))
355
self.assertEqual(u"b", index.idx_to_name(1))
356
self.assertEqual(u"b", index.idx_to_name(-1))
357
self.assertEqual(u"a", index.idx_to_name(-2))
359
def test_lookup(self):
360
transport = MockTransport([
365
index = _KnitIndex(transport, "filename", "r")
367
self.assertEqual(0, index.lookup(u"a"))
368
self.assertEqual(1, index.lookup(u"b"))
370
def test_add_version(self):
371
transport = MockTransport([
374
index = _KnitIndex(transport, "filename", "r")
376
index.add_version(u"a", ["option"], 0, 1, [u"b"])
377
self.assertEqual(("append_bytes",
378
("filename", "\na option 0 1 .b :"),
379
{}), transport.calls.pop(0))
380
self.assertTrue(index.has_version(u"a"))
381
self.assertEqual(1, index.num_versions())
382
self.assertEqual((0, 1), index.get_position(u"a"))
383
self.assertEqual(["option"], index.get_options(u"a"))
384
self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
386
index.add_version(u"a", ["opt"], 1, 2, [u"c"])
387
self.assertEqual(("append_bytes",
388
("filename", "\na opt 1 2 .c :"),
389
{}), transport.calls.pop(0))
390
self.assertTrue(index.has_version(u"a"))
391
self.assertEqual(1, index.num_versions())
392
self.assertEqual((1, 2), index.get_position(u"a"))
393
self.assertEqual(["opt"], index.get_options(u"a"))
394
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
396
index.add_version(u"b", ["option"], 2, 3, [u"a"])
397
self.assertEqual(("append_bytes",
398
("filename", "\nb option 2 3 0 :"),
399
{}), transport.calls.pop(0))
400
self.assertTrue(index.has_version(u"b"))
401
self.assertEqual(2, index.num_versions())
402
self.assertEqual((2, 3), index.get_position(u"b"))
403
self.assertEqual(["option"], index.get_options(u"b"))
404
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
406
def test_add_versions(self):
407
transport = MockTransport([
410
index = _KnitIndex(transport, "filename", "r")
413
(u"a", ["option"], 0, 1, [u"b"]),
414
(u"a", ["opt"], 1, 2, [u"c"]),
415
(u"b", ["option"], 2, 3, [u"a"])
417
self.assertEqual(("append_bytes", ("filename",
418
"\na option 0 1 .b :"
421
), {}), transport.calls.pop(0))
422
self.assertTrue(index.has_version(u"a"))
423
self.assertTrue(index.has_version(u"b"))
424
self.assertEqual(2, index.num_versions())
425
self.assertEqual((1, 2), index.get_position(u"a"))
426
self.assertEqual((2, 3), index.get_position(u"b"))
427
self.assertEqual(["opt"], index.get_options(u"a"))
428
self.assertEqual(["option"], index.get_options(u"b"))
429
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
430
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
432
def test_delay_create_and_add_versions(self):
433
transport = MockTransport()
435
index = _KnitIndex(transport, "filename", "w",
436
create=True, file_mode="wb", create_parent_dir=True,
437
delay_create=True, dir_mode=0777)
438
self.assertEqual([], transport.calls)
441
(u"a", ["option"], 0, 1, [u"b"]),
442
(u"a", ["opt"], 1, 2, [u"c"]),
443
(u"b", ["option"], 2, 3, [u"a"])
445
name, (filename, f), kwargs = transport.calls.pop(0)
446
self.assertEqual("put_file_non_atomic", name)
448
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
450
self.assertEqual("filename", filename)
453
"\na option 0 1 .b :"
455
"\nb option 2 3 0 :",
458
def test_has_version(self):
459
transport = MockTransport([
463
index = _KnitIndex(transport, "filename", "r")
465
self.assertTrue(index.has_version(u"a"))
466
self.assertFalse(index.has_version(u"b"))
468
def test_get_position(self):
469
transport = MockTransport([
474
index = _KnitIndex(transport, "filename", "r")
476
self.assertEqual((0, 1), index.get_position(u"a"))
477
self.assertEqual((1, 2), index.get_position(u"b"))
479
def test_get_method(self):
480
transport = MockTransport([
482
"a fulltext,unknown 0 1 :",
483
"b unknown,line-delta 1 2 :",
486
index = _KnitIndex(transport, "filename", "r")
488
self.assertEqual("fulltext", index.get_method(u"a"))
489
self.assertEqual("line-delta", index.get_method(u"b"))
490
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, u"c")
492
def test_get_options(self):
493
transport = MockTransport([
498
index = _KnitIndex(transport, "filename", "r")
500
self.assertEqual(["opt1"], index.get_options(u"a"))
501
self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
503
def test_get_parents(self):
504
transport = MockTransport([
507
"b option 1 2 0 .c :",
508
"c option 1 2 1 0 .e :"
510
index = _KnitIndex(transport, "filename", "r")
512
self.assertEqual([], index.get_parents(u"a"))
513
self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
514
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
516
def test_get_parents_with_ghosts(self):
517
transport = MockTransport([
520
"b option 1 2 0 .c :",
521
"c option 1 2 1 0 .e :"
523
index = _KnitIndex(transport, "filename", "r")
525
self.assertEqual([], index.get_parents_with_ghosts(u"a"))
526
self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
527
self.assertEqual([u"b", u"a", u"e"],
528
index.get_parents_with_ghosts(u"c"))
530
def test_check_versions_present(self):
531
transport = MockTransport([
536
index = _KnitIndex(transport, "filename", "r")
538
check = index.check_versions_present
544
self.assertRaises(RevisionNotPresent, check, [u"c"])
545
self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
548
class KnitTests(TestCaseWithTransport):
549
"""Class containing knit test helper routines."""
551
def make_test_knit(self, annotate=False, delay_create=False):
553
factory = KnitPlainFactory()
556
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
557
factory=factory, create=True,
558
delay_create=delay_create)
561
class BasicKnitTests(KnitTests):
563
def add_stock_one_and_one_a(self, k):
564
k.add_lines('text-1', [], split_lines(TEXT_1))
565
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
567
def test_knit_constructor(self):
568
"""Construct empty k"""
569
self.make_test_knit()
571
def test_knit_add(self):
572
"""Store one text in knit and retrieve"""
573
k = self.make_test_knit()
574
k.add_lines('text-1', [], split_lines(TEXT_1))
575
self.assertTrue(k.has_version('text-1'))
576
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
578
def test_knit_reload(self):
579
# test that the content in a reloaded knit is correct
580
k = self.make_test_knit()
581
k.add_lines('text-1', [], split_lines(TEXT_1))
583
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
584
self.assertTrue(k2.has_version('text-1'))
585
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
587
def test_knit_several(self):
588
"""Store several texts in a knit"""
589
k = self.make_test_knit()
590
k.add_lines('text-1', [], split_lines(TEXT_1))
591
k.add_lines('text-2', [], split_lines(TEXT_2))
592
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
593
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
595
def test_repeated_add(self):
596
"""Knit traps attempt to replace existing version"""
597
k = self.make_test_knit()
598
k.add_lines('text-1', [], split_lines(TEXT_1))
599
self.assertRaises(RevisionAlreadyPresent,
601
'text-1', [], split_lines(TEXT_1))
603
def test_empty(self):
604
k = self.make_test_knit(True)
605
k.add_lines('text-1', [], [])
606
self.assertEquals(k.get_lines('text-1'), [])
608
def test_incomplete(self):
609
"""Test if texts without a ending line-end can be inserted and
611
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
612
k.add_lines('text-1', [], ['a\n', 'b' ])
613
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
614
# reopening ensures maximum room for confusion
615
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
616
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
617
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
619
def test_delta(self):
620
"""Expression of knit delta as lines"""
621
k = self.make_test_knit()
622
td = list(line_delta(TEXT_1.splitlines(True),
623
TEXT_1A.splitlines(True)))
624
self.assertEqualDiff(''.join(td), delta_1_1a)
625
out = apply_line_delta(TEXT_1.splitlines(True), td)
626
self.assertEqualDiff(''.join(out), TEXT_1A)
628
def test_add_with_parents(self):
629
"""Store in knit with parents"""
630
k = self.make_test_knit()
631
self.add_stock_one_and_one_a(k)
632
self.assertEquals(k.get_parents('text-1'), [])
633
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
635
def test_ancestry(self):
636
"""Store in knit with parents"""
637
k = self.make_test_knit()
638
self.add_stock_one_and_one_a(k)
639
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
641
def test_add_delta(self):
642
"""Store in knit with parents"""
643
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
644
delta=True, create=True)
645
self.add_stock_one_and_one_a(k)
647
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
649
def test_annotate(self):
651
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
652
delta=True, create=True)
653
self.insert_and_test_small_annotate(k)
655
def insert_and_test_small_annotate(self, k):
656
"""test annotation with k works correctly."""
657
k.add_lines('text-1', [], ['a\n', 'b\n'])
658
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
660
origins = k.annotate('text-2')
661
self.assertEquals(origins[0], ('text-1', 'a\n'))
662
self.assertEquals(origins[1], ('text-2', 'c\n'))
664
def test_annotate_fulltext(self):
666
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
667
delta=False, create=True)
668
self.insert_and_test_small_annotate(k)
670
def test_annotate_merge_1(self):
671
k = self.make_test_knit(True)
672
k.add_lines('text-a1', [], ['a\n', 'b\n'])
673
k.add_lines('text-a2', [], ['d\n', 'c\n'])
674
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
675
origins = k.annotate('text-am')
676
self.assertEquals(origins[0], ('text-a2', 'd\n'))
677
self.assertEquals(origins[1], ('text-a1', 'b\n'))
679
def test_annotate_merge_2(self):
680
k = self.make_test_knit(True)
681
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
682
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
683
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
684
origins = k.annotate('text-am')
685
self.assertEquals(origins[0], ('text-a1', 'a\n'))
686
self.assertEquals(origins[1], ('text-a2', 'y\n'))
687
self.assertEquals(origins[2], ('text-a1', 'c\n'))
689
def test_annotate_merge_9(self):
690
k = self.make_test_knit(True)
691
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
692
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
693
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
694
origins = k.annotate('text-am')
695
self.assertEquals(origins[0], ('text-am', 'k\n'))
696
self.assertEquals(origins[1], ('text-a2', 'y\n'))
697
self.assertEquals(origins[2], ('text-a1', 'c\n'))
699
def test_annotate_merge_3(self):
700
k = self.make_test_knit(True)
701
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
702
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
703
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
704
origins = k.annotate('text-am')
705
self.assertEquals(origins[0], ('text-am', 'k\n'))
706
self.assertEquals(origins[1], ('text-a2', 'y\n'))
707
self.assertEquals(origins[2], ('text-a2', 'z\n'))
709
def test_annotate_merge_4(self):
710
k = self.make_test_knit(True)
711
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
712
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
713
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
714
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
715
origins = k.annotate('text-am')
716
self.assertEquals(origins[0], ('text-a1', 'a\n'))
717
self.assertEquals(origins[1], ('text-a1', 'b\n'))
718
self.assertEquals(origins[2], ('text-a2', 'z\n'))
720
def test_annotate_merge_5(self):
721
k = self.make_test_knit(True)
722
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
723
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
724
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
725
k.add_lines('text-am',
726
['text-a1', 'text-a2', 'text-a3'],
727
['a\n', 'e\n', 'z\n'])
728
origins = k.annotate('text-am')
729
self.assertEquals(origins[0], ('text-a1', 'a\n'))
730
self.assertEquals(origins[1], ('text-a2', 'e\n'))
731
self.assertEquals(origins[2], ('text-a3', 'z\n'))
733
def test_annotate_file_cherry_pick(self):
734
k = self.make_test_knit(True)
735
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
736
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
737
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
738
origins = k.annotate('text-3')
739
self.assertEquals(origins[0], ('text-1', 'a\n'))
740
self.assertEquals(origins[1], ('text-1', 'b\n'))
741
self.assertEquals(origins[2], ('text-1', 'c\n'))
743
def test_knit_join(self):
744
"""Store in knit with parents"""
745
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
746
k1.add_lines('text-a', [], split_lines(TEXT_1))
747
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
749
k1.add_lines('text-c', [], split_lines(TEXT_1))
750
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
752
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
754
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
755
count = k2.join(k1, version_ids=['text-m'])
756
self.assertEquals(count, 5)
757
self.assertTrue(k2.has_version('text-a'))
758
self.assertTrue(k2.has_version('text-c'))
760
def test_reannotate(self):
761
k1 = KnitVersionedFile('knit1', get_transport('.'),
762
factory=KnitAnnotateFactory(), create=True)
764
k1.add_lines('text-a', [], ['a\n', 'b\n'])
766
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
768
k2 = KnitVersionedFile('test2', get_transport('.'),
769
factory=KnitAnnotateFactory(), create=True)
770
k2.join(k1, version_ids=['text-b'])
773
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
775
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
777
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
779
# test-c will have index 3
780
k1.join(k2, version_ids=['text-c'])
782
lines = k1.get_lines('text-c')
783
self.assertEquals(lines, ['z\n', 'c\n'])
785
origins = k1.annotate('text-c')
786
self.assertEquals(origins[0], ('text-c', 'z\n'))
787
self.assertEquals(origins[1], ('text-b', 'c\n'))
789
def test_get_line_delta_texts(self):
790
"""Make sure we can call get_texts on text with reused line deltas"""
791
k1 = KnitVersionedFile('test1', get_transport('.'),
792
factory=KnitPlainFactory(), create=True)
797
parents = ['%d' % (t-1)]
798
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
799
k1.get_texts(('%d' % t) for t in range(3))
801
def test_iter_lines_reads_in_order(self):
802
t = MemoryTransport()
803
instrumented_t = TransportLogger(t)
804
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
805
self.assertEqual([('id.kndx',)], instrumented_t._calls)
806
# add texts with no required ordering
807
k1.add_lines('base', [], ['text\n'])
808
k1.add_lines('base2', [], ['text2\n'])
810
instrumented_t._calls = []
811
# request a last-first iteration
812
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
813
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
814
self.assertEqual(['text\n', 'text2\n'], results)
816
def test_create_empty_annotated(self):
817
k1 = self.make_test_knit(True)
819
k1.add_lines('text-a', [], ['a\n', 'b\n'])
820
k2 = k1.create_empty('t', MemoryTransport())
821
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
822
self.assertEqual(k1.delta, k2.delta)
823
# the generic test checks for empty content and file class
825
def test_knit_format(self):
826
# this tests that a new knit index file has the expected content
827
# and that is writes the data we expect as records are added.
828
knit = self.make_test_knit(True)
829
# Now knit files are not created until we first add data to them
830
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
831
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
832
self.assertFileEqual(
833
"# bzr knit index 8\n"
835
"revid fulltext 0 84 .a_ghost :",
837
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
838
self.assertFileEqual(
839
"# bzr knit index 8\n"
840
"\nrevid fulltext 0 84 .a_ghost :"
841
"\nrevid2 line-delta 84 82 0 :",
843
# we should be able to load this file again
844
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
845
self.assertEqual(['revid', 'revid2'], knit.versions())
846
# write a short write to the file and ensure that its ignored
847
indexfile = file('test.kndx', 'at')
848
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
850
# we should be able to load this file again
851
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
852
self.assertEqual(['revid', 'revid2'], knit.versions())
853
# and add a revision with the same id the failed write had
854
knit.add_lines('revid3', ['revid2'], ['a\n'])
855
# and when reading it revid3 should now appear.
856
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
857
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
858
self.assertEqual(['revid2'], knit.get_parents('revid3'))
860
def test_delay_create(self):
861
"""Test that passing delay_create=True creates files late"""
862
knit = self.make_test_knit(annotate=True, delay_create=True)
863
self.failIfExists('test.knit')
864
self.failIfExists('test.kndx')
865
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
866
self.failUnlessExists('test.knit')
867
self.assertFileEqual(
868
"# bzr knit index 8\n"
870
"revid fulltext 0 84 .a_ghost :",
873
def test_create_parent_dir(self):
874
"""create_parent_dir can create knits in nonexistant dirs"""
875
# Has no effect if we don't set 'delay_create'
876
trans = get_transport('.')
877
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
878
trans, access_mode='w', factory=None,
879
create=True, create_parent_dir=True)
880
# Nothing should have changed yet
881
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
882
factory=None, create=True,
883
create_parent_dir=True,
885
self.failIfExists('dir/test.knit')
886
self.failIfExists('dir/test.kndx')
887
self.failIfExists('dir')
888
knit.add_lines('revid', [], ['a\n'])
889
self.failUnlessExists('dir')
890
self.failUnlessExists('dir/test.knit')
891
self.assertFileEqual(
892
"# bzr knit index 8\n"
894
"revid fulltext 0 84 :",
897
def test_create_mode_700(self):
898
trans = get_transport('.')
899
if not trans._can_roundtrip_unix_modebits():
900
# Can't roundtrip, so no need to run this test
902
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
903
factory=None, create=True,
904
create_parent_dir=True,
908
knit.add_lines('revid', [], ['a\n'])
909
self.assertTransportMode(trans, 'dir', 0700)
910
self.assertTransportMode(trans, 'dir/test.knit', 0600)
911
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
913
def test_create_mode_770(self):
914
trans = get_transport('.')
915
if not trans._can_roundtrip_unix_modebits():
916
# Can't roundtrip, so no need to run this test
918
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
919
factory=None, create=True,
920
create_parent_dir=True,
924
knit.add_lines('revid', [], ['a\n'])
925
self.assertTransportMode(trans, 'dir', 0770)
926
self.assertTransportMode(trans, 'dir/test.knit', 0660)
927
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
929
def test_create_mode_777(self):
930
trans = get_transport('.')
931
if not trans._can_roundtrip_unix_modebits():
932
# Can't roundtrip, so no need to run this test
934
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
935
factory=None, create=True,
936
create_parent_dir=True,
940
knit.add_lines('revid', [], ['a\n'])
941
self.assertTransportMode(trans, 'dir', 0777)
942
self.assertTransportMode(trans, 'dir/test.knit', 0666)
943
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
945
def test_plan_merge(self):
946
my_knit = self.make_test_knit(annotate=True)
947
my_knit.add_lines('text1', [], split_lines(TEXT_1))
948
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
949
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
950
plan = list(my_knit.plan_merge('text1a', 'text1b'))
951
for plan_line, expected_line in zip(plan, AB_MERGE):
952
self.assertEqual(plan_line, expected_line)
964
Banana cup cake recipe
974
Banana cup cake recipe
976
- bananas (do not use plantains!!!)
983
Banana cup cake recipe
999
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1004
new-b|- bananas (do not use plantains!!!)
1005
unchanged|- broken tea cups
1006
new-a|- self-raising flour
1009
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1012
def line_delta(from_lines, to_lines):
1013
"""Generate line-based delta from one text to another"""
1014
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1015
for op in s.get_opcodes():
1016
if op[0] == 'equal':
1018
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1019
for i in range(op[3], op[4]):
1023
def apply_line_delta(basis_lines, delta_lines):
1024
"""Apply a line-based perfect diff
1026
basis_lines -- text to apply the patch to
1027
delta_lines -- diff instructions and content
1029
out = basis_lines[:]
1032
while i < len(delta_lines):
1034
a, b, c = map(long, l.split(','))
1036
out[offset+a:offset+b] = delta_lines[i:i+c]
1038
offset = offset + (b - a) + c
1042
class TestWeaveToKnit(KnitTests):
1044
def test_weave_to_knit_matches(self):
1045
# check that the WeaveToKnit is_compatible function
1046
# registers True for a Weave to a Knit.
1048
k = self.make_test_knit()
1049
self.failUnless(WeaveToKnit.is_compatible(w, k))
1050
self.failIf(WeaveToKnit.is_compatible(k, w))
1051
self.failIf(WeaveToKnit.is_compatible(w, w))
1052
self.failIf(WeaveToKnit.is_compatible(k, k))
1055
class TestKnitCaching(KnitTests):
1057
def create_knit(self, cache_add=False):
1058
k = self.make_test_knit(True)
1062
k.add_lines('text-1', [], split_lines(TEXT_1))
1063
k.add_lines('text-2', [], split_lines(TEXT_2))
1066
def test_no_caching(self):
1067
k = self.create_knit()
1068
# Nothing should be cached without setting 'enable_cache'
1069
self.assertEqual({}, k._data._cache)
1071
def test_cache_add_and_clear(self):
1072
k = self.create_knit(True)
1074
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1077
self.assertEqual({}, k._data._cache)
1079
def test_cache_data_read_raw(self):
1080
k = self.create_knit()
1082
# Now cache and read
1085
def read_one_raw(version):
1086
pos_map = k._get_components_positions([version])
1087
method, pos, size, next = pos_map[version]
1088
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1089
self.assertEqual(1, len(lst))
1092
val = read_one_raw('text-1')
1093
self.assertEqual({'text-1':val[1]}, k._data._cache)
1096
# After clear, new reads are not cached
1097
self.assertEqual({}, k._data._cache)
1099
val2 = read_one_raw('text-1')
1100
self.assertEqual(val, val2)
1101
self.assertEqual({}, k._data._cache)
1103
def test_cache_data_read(self):
1104
k = self.create_knit()
1106
def read_one(version):
1107
pos_map = k._get_components_positions([version])
1108
method, pos, size, next = pos_map[version]
1109
lst = list(k._data.read_records_iter([(version, pos, size)]))
1110
self.assertEqual(1, len(lst))
1113
# Now cache and read
1116
val = read_one('text-2')
1117
self.assertEqual(['text-2'], k._data._cache.keys())
1118
self.assertEqual('text-2', val[0])
1119
content, digest = k._data._parse_record('text-2',
1120
k._data._cache['text-2'])
1121
self.assertEqual(content, val[1])
1122
self.assertEqual(digest, val[2])
1125
self.assertEqual({}, k._data._cache)
1127
val2 = read_one('text-2')
1128
self.assertEqual(val, val2)
1129
self.assertEqual({}, k._data._cache)
1131
def test_cache_read(self):
1132
k = self.create_knit()
1135
text = k.get_text('text-1')
1136
self.assertEqual(TEXT_1, text)
1137
self.assertEqual(['text-1'], k._data._cache.keys())
1140
self.assertEqual({}, k._data._cache)
1142
text = k.get_text('text-1')
1143
self.assertEqual(TEXT_1, text)
1144
self.assertEqual({}, k._data._cache)
1147
class TestKnitIndex(KnitTests):
1149
def test_add_versions_dictionary_compresses(self):
1150
"""Adding versions to the index should update the lookup dict"""
1151
knit = self.make_test_knit()
1153
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1154
self.check_file_contents('test.kndx',
1155
'# bzr knit index 8\n'
1157
'a-1 fulltext 0 0 :'
1159
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1160
('a-3', ['fulltext'], 0, 0, ['a-2']),
1162
self.check_file_contents('test.kndx',
1163
'# bzr knit index 8\n'
1165
'a-1 fulltext 0 0 :\n'
1166
'a-2 fulltext 0 0 0 :\n'
1167
'a-3 fulltext 0 0 1 :'
1169
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1170
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1171
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1172
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1175
def test_add_versions_fails_clean(self):
1176
"""If add_versions fails in the middle, it restores a pristine state.
1178
Any modifications that are made to the index are reset if all versions
1181
# This cheats a little bit by passing in a generator which will
1182
# raise an exception before the processing finishes
1183
# Other possibilities would be to have an version with the wrong number
1184
# of entries, or to make the backing transport unable to write any
1187
knit = self.make_test_knit()
1189
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1191
class StopEarly(Exception):
1194
def generate_failure():
1195
"""Add some entries and then raise an exception"""
1196
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1197
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1200
# Assert the pre-condition
1201
self.assertEqual(['a-1'], idx._history)
1202
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1204
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1206
# And it shouldn't be modified
1207
self.assertEqual(['a-1'], idx._history)
1208
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1210
def test_knit_index_ignores_empty_files(self):
1211
# There was a race condition in older bzr, where a ^C at the right time
1212
# could leave an empty .kndx file, which bzr would later claim was a
1213
# corrupted file since the header was not present. In reality, the file
1214
# just wasn't created, so it should be ignored.
1215
t = get_transport('.')
1216
t.put_bytes('test.kndx', '')
1218
knit = self.make_test_knit()
1220
def test_knit_index_checks_header(self):
1221
t = get_transport('.')
1222
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1224
self.assertRaises(KnitHeaderError, self.make_test_knit)